Permalink
Browse files

Merge commit 'gnufied/master'

  • Loading branch information...
2 parents 45029fc + db233a8 commit 5438cc0e2e91ba35c216027348233e48b95693bb Seth Morabito committed Sep 23, 2008
View
@@ -9,3 +9,4 @@ doc/output/scheduling
doc/output/workers
doc/output/index.html
coverage
+.dotest
View
@@ -1,3 +1,9 @@
+2008-09-07 hemant kumar <hemant@shire.com>
+
+ * Commit patch by P Baker, related to scheduling a persistent task at specified time. For example:
+ MiddleMan(:hello_worker).enq_some_task(:arg => "hello_world",
+ :job_key => "boy",:scheduled_at => (Time.now + 1.hour))
+
2008-06-19 hemant kumar <hemant@shire.com>
* Make binary parser iterative and hence won't blow your stack
View
5 README
@@ -13,11 +13,6 @@ Copyright (c) 2006 Ezra Zygmuntowicz,skaar[at]waste[dot]org,
Copyright (c) 2007 Hemant Kumar (gethemant [at] gmail.com )
-== Notes
-
- If you are using UNIX styled scheduler and using activesupport helpers in yaml file, please note that,
- that will not work currently, because activesupport gem is not loaded before reading yaml file.
-
== Usage
Please look into http://backgroundrb.rubyforge.org
@@ -79,6 +79,25 @@ rails using:
You can as usual use @worker_key@ if *worker was started with a worker_key*.
+p(sub-title). Enqueue task to the persistent job queue :
+
+Jobs executed via synchronous and asynchronous APIs are fine, but these tasks are usually
+kept in memory(and hence they are fast) and hence aren't entirely failsafe.
+
+To solve this _BackgrounDRb_ also lets you add jobs to a persistent job queue, which is
+automatically picked by responsible worker and invoked. To use this:
+
+<pre class="boxed">MiddleMan(:hello_worker).enq_some_task(:arg => "hello_world",:job_key => "boy")</pre>
+
+With _BackgrounDRb_ version >= 1.1, you can also schedule a persistent task to be executed at a particular time,
+
+<pre class="multiline">MiddleMan(:hello_worker).enq_some_task(:arg => "hello_world",
+ :job_key => "boy",:scheduled_at => (Time.now + 1.hour))</pre>
+
+Above line will add specified task to the job queue and set to be invoked at specified time. For more information
+about scheduling see scheduling section.
+
+
p(sub-title). Start a Worker :
To start a worker from rails:
@@ -154,6 +154,7 @@ class HelloWorker
end
end</pre>
+
%(entry-title)<a href="testing">Testing Workers </a>%
_BackgrounDRb_ comes with a baked in mechanism to write test cases. First make sure that you
@@ -24,9 +24,15 @@ def self.parse_cmd_options(argv)
def self.read_config(config_file)
config = YAML.load(ERB.new(IO.read(config_file)).result)
environment = ENV["RAILS_ENV"] || config[:backgroundrb][:environment] || "development"
- silence_warnings do
+
+ if respond_to?(:silence_warnings)
+ silence_warnings do
+ Object.const_set("RAILS_ENV",environment)
+ end
+ else
Object.const_set("RAILS_ENV",environment)
end
+
ENV["RAILS_ENV"] = environment
config
end
@@ -4,9 +4,10 @@ def self.find_next(worker_name,worker_key = nil)
returned_job = nil
transaction do
unless worker_key
- t_job = find(:first,:conditions => { :worker_name => worker_name,:taken => 0},:lock => true)
+ #use ruby time stamps for time calculations as db might have different times than what is calculated by ruby/rails
+ t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND scheduled_at <= ? ", worker_name, 0, Time.now.utc ],:lock => true)
else
- t_job = find(:first,:conditions => { :worker_name => worker_name,:taken => 0,:worker_key => worker_key },:lock => true)
+ t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND worker_key = ? AND scheduled_at <= ? ", worker_name, 0, worker_key, Time.now.utc ],:lock => true)
end
if t_job
t_job.taken = 1
@@ -34,12 +35,23 @@ def self.insert_job(options = { })
end
end
+ def self.remove_job(options = { })
+ transaction do
+ t_job_id = find(:first, :conditions => options.merge(:finished => 0,:taken => 0),:lock => true)
+ delete(t_job_id)
+ end
+ end
+
def finish!
self.class.transaction do
self.finished = 1
self.finished_at = Time.now
+ self.job_key = "finished_#{Time.now.to_i}_#{job_key}"
self.save
end
+ Thread.current[:persistent_job_id] = nil
+ Thread.current[:job_key] = nil
+ nil
end
end
@@ -13,7 +13,8 @@ def method_missing(method_id,*args)
worker_method = method_id.to_s
arguments = args.first
- arg,job_key,host_info = arguments && arguments.values_at(:arg,:job_key,:host)
+ arg,job_key,host_info,scheduled_at = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at)
+ new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc
if worker_method =~ /^async_(\w+)/
method_name = $1
@@ -26,7 +27,12 @@ def method_missing(method_id,*args)
marshalled_args = Marshal.dump(arg)
enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
:worker_method => method_name.to_s,:job_key => job_key.to_s,
- :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil))
+ :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil,:scheduled_at => new_schedule))
+ elsif worker_method =~ /^deq_(\w+)/i
+ raise NoJobKey.new("Must specify a job key to dequeue tasks") if job_key.blank?
+ method_name = $1
+ dequeue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
+ :worker_method => method_name.to_s,:job_key => job_key.to_s))
else
worker_options = compact(:worker => worker_name,:worker_key => worker_key,
:worker_method => worker_method,:job_key => job_key,:arg => arg)
@@ -38,6 +44,10 @@ def enqueue_task options = {}
BdrbJobQueue.insert_job(options)
end
+ def dequeue_task options = {}
+ BdrbJobQueue.remove_job(options)
+ end
+
def run_method host_info,method_name,worker_options = {}
result = []
connection = choose_connection(host_info)
@@ -47,7 +57,7 @@ def run_method host_info,method_name,worker_options = {}
elsif host_info == :all
succeeded = false
begin
- connection.each { |conn| result << invoke_on_connection(connection,method_name,worker_options) }
+ connection.each { |conn| result << invoke_on_connection(conn,method_name,worker_options) }
succeeded = true
rescue BdrbConnError; end
raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
@@ -98,7 +98,7 @@ def load_and_invoke(worker_name,p_method,data)
begin
require worker_name.to_s
worker_key = Packet::Guid.hexdigest
- @reactor.start_worker(:worker => worker_name,:worker_key => worker_key)
+ @reactor.start_worker(:worker => worker_name,:worker_key => worker_key,:disable_log => true)
worker_name_key = gen_worker_key(worker_name,worker_key)
data_request = {:data => { :worker_method => p_method,:arg => data[:data]},
:type => :request, :result => false
View
@@ -5,20 +5,27 @@ class PacketLogger
def initialize(worker,log_flag = true)
@log_flag = log_flag
@worker = worker
+ @log_mutex = Mutex.new
end
def info(p_data)
return unless @log_flag
- @worker.send_request(:worker => :log_worker, :data => p_data)
+ @log_mutex.synchronize do
+ @worker.send_request(:worker => :log_worker, :data => p_data)
+ end
end
def debug(p_data)
return unless @log_flag
- @worker.send_request(:worker => :log_worker, :data => p_data)
+ @log_mutex.synchronize do
+ @worker.send_request(:worker => :log_worker, :data => p_data)
+ end
end
def error(p_data)
return unless @log_flag
- @worker.send_request(:worker => :log_worker, :data => p_data)
+ @log_mutex.synchronize do
+ @worker.send_request(:worker => :log_worker, :data => p_data)
+ end
end
end
# == MetaWorker class
@@ -45,6 +45,11 @@ namespace :backgroundrb do
Rake::Task['backgroundrb:queue_migration'].invoke
end
+ desc "Drops and recreate backgroundrb queue table"
+ task :redo_queue do
+ redo_queue_migration
+ end
+
desc 'update backgroundrb config files from your rails application'
task :update do
temp_scripts = ["backgroundrb","load_worker_env.rb"].map {|x| "#{RAILS_ROOT}/script/#{x}"}
@@ -3,7 +3,7 @@
:ip: 0.0.0.0
:port: 11008
:log: foreground
- :environment: production
+ :environment: development
:result_storage: memcache
:memcache: "localhost:11211"
@@ -11,9 +11,10 @@
end
specify "should setup correct environment from conf file" do
+ ENV["RAILS_ENV"] = nil
BackgrounDRb::Config.parse_cmd_options([])
BackgrounDRb::Config.read_config(conf_file)
- ENV["RAILS_ENV"].should == "production"
- RAILS_ENV.should == "production"
+ ENV["RAILS_ENV"].should == "development"
+ RAILS_ENV.should == "development"
end
end
@@ -3,5 +3,44 @@
require "bdrb_job_queue"
context "For BackgrounDRb job Queues" do
+ setup do
+ db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
+ ActiveRecord::Base.establish_connection(db_config_file["test"])
+ BdrbJobQueue.destroy_all
+ end
+ specify "should insert job with proper params" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ next_job = BdrbJobQueue.find_next("hello_world")
+ next_job.taken.should == 1
+ next_job.started_at.should.not.be nil
+ next_job.job_key.should == "cats"
+ next_job.worker_name.should == "hello_world"
+ next_job.worker_method.should == "foovar"
+ end
+
+ specify "release_job should worker properly" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ next_job = BdrbJobQueue.find_next("hello_world")
+ next_job.release_job
+ t = BdrbJobQueue.find_by_job_key("cats")
+ t.taken.should == 0
+ t.started_at.should == nil
+ end
+
+ specify "remove job should work properly" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ BdrbJobQueue.remove_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats")
+ t = BdrbJobQueue.find_by_job_key("cats")
+ t.should == nil
+ end
+
+ specify "finish should work properly" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ t = BdrbJobQueue.find_next("hello_world")
+ t.finish!
+ t.finished.should == 1
+ t.finished_at.should.not == nil
+ t.job_key.should.match(/finished_\d+_cats/i)
+ end
end
@@ -11,6 +11,7 @@
@cluster_conn = mock
@worker_proxy = BackgrounDRb::RailsWorkerProxy.new(:hello_worker,nil,@cluster_conn)
end
+
specify "should let you fetch results" do
@cluster_conn.expects(:backend_connections).returns([])
foo = @worker_proxy.ask_result(:foobar)
@@ -34,6 +35,13 @@
end
specify "delete method should run on all nodes" do
+ conn_array = (0..3).map do |i|
+ t = mock()
+ t.expects(:delete_worker).with(:worker => :hello_worker).returns(nil)
+ t
+ end
+ @cluster_conn.expects(:backend_connections).returns(conn_array)
+ @worker_proxy.delete
end
specify "should let you invoke worker_info method" do
@@ -54,6 +62,40 @@
@cluster_conn.expects(:find_connection).returns(actual_conn)
@worker_proxy.async_foobar(:arg => :hello,:job_key => "boy",
:host => "192.168.2.100:100")
+ end
+
+ specify "for enqueued tasks" do
+ BdrbJobQueue = mock() unless Object.const_defined?(:BdrbJobQueue)
+ BdrbJobQueue.expects(:insert_job).with() { |value|
+ value[:worker_name].should == "hello_worker"
+ value[:worker_method].should == "foobar"
+ value[:scheduled_at].should.not == nil
+ value[:job_key] == "catz"
+ }
+ @worker_proxy.enq_foobar(:arg => :hello,:job_key => "catz")
+ end
+
+ specify "for removing tasks from the queue" do
+ BdrbJobQueue = mock() unless Object.const_defined?(:BdrbJobQueue)
+ BdrbJobQueue.expects(:remove_job).with() do |value|
+ value[:worker_name] == "hello_worker"
+ value[:worker_method] == "foobar"
+ value[:job_key] == "catz"
+ end
+ @worker_proxy.deq_foobar(:job_key => "catz")
+ end
+
+ specify "should run task on all servers if asked" do
+ backend_connections = []
+ 2.times { |i|
+ actual_conn = mock()
+ actual_conn.expects(:ask_work).with(:worker => :hello_worker,:worker_method => 'foobar',:job_key => 'hello')
+ backend_connections << actual_conn
+ }
+ @cluster_conn.expects(:backend_connections).returns(backend_connections)
+ a = @worker_proxy.async_foobar(:job_key => "hello",:host => :all)
+ end
+ specify "should switch connections if invoke fails on chosen one" do
end
end

0 comments on commit 5438cc0

Please sign in to comment.