Skip to content

Commit

Permalink
make taks runnable on specific hosts
Browse files Browse the repository at this point in the history
  • Loading branch information
gnufied committed Jul 3, 2008
1 parent 602b285 commit 0d74b45
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 53 deletions.
19 changes: 11 additions & 8 deletions lib/backgroundrb/bdrb_cluster_connection.rb
Expand Up @@ -74,6 +74,16 @@ def find_next_except_these connections
chosen chosen
end end


def find_connection host_info
conn = @backend_connections.detect { |x| x.server_info == host_info }
raise NoServerAvailable.new("BackgrounDRb server is not found running on #{host_info}") unless conn
return conn
end

def find_local
find_connection("#{BDRB_CONFIG[:backgroundrb][:ip]}:#{BDRB_CONFIG[:backgroundrb][:port]}")
end

def worker(worker_name,worker_key = nil) def worker(worker_name,worker_key = nil)
update_stats update_stats
RailsWorkerProxy.new(worker_name,worker_key,self) RailsWorkerProxy.new(worker_name,worker_key,self)
Expand Down Expand Up @@ -115,14 +125,7 @@ def new_worker options = {}
raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
end end


def choose_server(host_info = nil) def choose_server
if host_info
return @backend_connections if host_info == :all
conn = @backend_connections.detect { |x| x.server_info == host_info }
raise NoServerAvailable.new("BackgrounDRb server is not found running on #{host_info}") unless conn
return conn
end

if @round_robin.empty? if @round_robin.empty?
@round_robin = (0...@backend_connections.length).to_a @round_robin = (0...@backend_connections.length).to_a
end end
Expand Down
118 changes: 86 additions & 32 deletions lib/backgroundrb/rails_worker_proxy.rb
Expand Up @@ -11,48 +11,102 @@ def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil)


def method_missing(method_id,*args) def method_missing(method_id,*args)
worker_method = method_id worker_method = method_id
arguments = *args arguments = arguments.first


result = nil arg,job_key,host_info = arguments && arguments.values_at(:arg,:job_key,:host)


connection = (Hash === arguments ) ? middle_man.choose_server(arguments[:host]) : middle_man.choose_server if worker_method =~ /^async_(\w+)/
@tried_connections << connection.server_info method_name = $1

wokrer_options = compact(:worker => worker_name,:worker_key => worker_key,:worker_method => method_name,:job_key => job_key, :arg => arg)
begin run_method(host_info,:ask_work,wokrer_options)
result = invoke_on_connection(connection,worker_method,data) elsif worker_method =~ /^enq_(\w+)/i
rescue BdrbConnError => e method_name = $1
connection = middle_man.find_next_except_these(@tried_connections) args = Marshal.dump(arg)
@tried_connections << connection.server_info enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,:worker_method => method_name.to_s,:job_key => job_key.to_s, :args => args,:timeout => data[:timeout]))
retry else
worker_options = compact(:worker => worker_name,:worker_key => worker_key,:worker_method => worker_method,:job_key => job_key,:arg => arg)
run_method(host_info,:send_request,worker_options)
end end
end end


def invoke_on_connection connection,worker_method,data def enqueue_task options = {}
raise NoServerAvailable.new("No BackgrounDRb server is found running") unless connection BdrbJobQueue.insert_job(options)
case worker_method end
when :ask_result
connection.ask_result(compact(:worker => worker_name,:worker_key => worker_key,:job_key => data[0])) def run_method host_info,method_name,worker_options = {}
when :worker_info result = []
connection.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) connection = choose_connection(host_info)
when :delete raise NoServerAvailable.new("No BackgrounDRb server is found running") if connection.blank?
connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key)) if host_info == :local or host_info.is_a?(String)
result << invoke_on_connection(connection,method_name,worker_options)
elsif host_info == :all
succeeded = false
begin
connection.each { |conn| result << invoke_on_connection(connection,method_name,worker_options) }
succeeded = true
rescue BdrbConnError; end
raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
else else
choose_method(worker_method.to_s,data,connection) @tried_connections << connection
begin
result << invoke_on_connection(connection,method_name,worker_options)
rescue BdrbConnError => e
connection = middle_man.find_next_except_these(@tried_connections)
@tried_connections << connection
retry
end
end end
return nil if method_name == :ask_work
return_result(result)
end end


def choose_method worker_method,data,connection def invoke_on_connection connection,method_name,options = {}
job_key = data[0] raise NoServerAvailable.new("No BackgrounDRb is found running") unless connection
if worker_method =~ /^async_(\w+)/ connection.send(method_name,options)
method_name = $1 end
connection.ask_work(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => method_name,:job_key => job_key, :arg => data[1..-1]))
elsif worker_method =~ /^enq_(\w+)/i def ask_result job_key
method_name = $1 options = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key)
args = Marshal.dump([data[1]]) if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
options = data[2] || {} return_result_from_memcache(options)
connection.enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,:worker_method => method_name.to_s,:job_key => job_key.to_s, :args => args,:timeout => options[:timeout]))
else else
connection.send_request(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => worker_method,:job_key => job_key,:arg => data[1..-1])) result = middle_man.backend_connections.map { |conn| conn.ask_result(options) }
return_result(result)
end
end

def worker_info
t_connections = middle_man.backend_connections
result = t_connections.map { |conn| conn.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) }
return_result(result)
end

def gen_key options
key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_')
key
end

def return_result_from_memcache options = {}
middle_man.cache[gen_key(options)]
end

def return_result result
result = Array(result)
result.size <= 1 ? result[0] : result
end

def delete
middle_man.backend_connections.each do |connection|
connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key))
end
end

def choose_connection host_info
case host_info
when :all; middle_man.backend_connections
when :local; middle_man.find_local
when String; middle_man.find_connection(host_info)
else; middle_man.choose_server
end end
end end


Expand Down
1 change: 1 addition & 0 deletions script/backgroundrb
Expand Up @@ -70,3 +70,4 @@ when 'stop'
else else
BackgrounDRb::MasterProxy.new() BackgrounDRb::MasterProxy.new()
end end

2 changes: 1 addition & 1 deletion server/lib/bdrb_thread_pool.rb
@@ -1,6 +1,6 @@
module BackgrounDRb module BackgrounDRb
class WorkData class WorkData
attr_accessor :args,:block,:job_method) attr_accessor :args,:block,:job_method
def initialize(args,job_key,job_method) def initialize(args,job_key,job_method)
@args = args @args = args
@job_key = job_key @job_key = job_key
Expand Down
17 changes: 17 additions & 0 deletions tasks/backgroundrb_tasks.rake
Expand Up @@ -80,9 +80,26 @@ namespace :backgroundrb do
setup_queue_migration setup_queue_migration
end end


desc 'update backgroundrb config files from your rails application'
task :update do
temp_scripts = ["backgroundrb","load_worker_env.rb"].map {|x| "#{RAILS_ROOT}/script/#{x}"}
temp_scripts.each do |file_name|
if File.exists?(file_name)
puts "Removing #{file_name} ..."
FileUtils.rm(file_name,:force => true)
end
end
new_temp_scripts = ["backgroundrb","load_worker_env.rb"].map {|x| File.dirname(__FILE__) + "/../script/#{x}" }
new_temp_scripts.each do |file_name|
puts "Updating file #{File.expand_path(file_name)} ..."
FileUtils.cp_r(file_name,"#{RAILS_ROOT}/script/")
end
end

desc 'Remove backgroundrb from your rails application' desc 'Remove backgroundrb from your rails application'
task :remove do task :remove do
script_src = "#{RAILS_ROOT}/script/backgroundrb" script_src = "#{RAILS_ROOT}/script/backgroundrb"
temp_scripts = ["backgroundrb","load_worker_env.rb"].map {|x| "#{RAILS_ROOT}/script/#{x}"}


if File.exists?(script_src) if File.exists?(script_src)
puts "Removing #{script_src} ..." puts "Removing #{script_src} ..."
Expand Down
30 changes: 18 additions & 12 deletions test/client/test_worker_proxy.rb
@@ -1,25 +1,31 @@
require File.join(File.dirname(__FILE__) + "/bdrb_test_helper") require File.join(File.dirname(__FILE__) + "/../bdrb_test_helper")
require File.join(RAILS_HOME + "/config/environment") require File.join(RAILS_HOME + "/config/environment")
require "backgroundrb" require "backgroundrb"


context "Worker Proxy in general" do context "Worker Proxy in general" do
specify "should let you invoke ask_work method" do setup do

@cluster_conn = mock
@worker_proxy = BackgrounDRb::RailsWorkerProxy.new(:hello_worker,nil,@cluster_conn)
end end

specify "should let you fetch results" do
@cluster_conn.expects(:backend_connections).returns([])
foo = @worker_proxy.ask_result(:foobar)
foo.should.be nil
end

specify "should let you invoke send_request method" do specify "should let you invoke send_request method" do

end end

specify "should let you invoke delete method" do specify "should let you invoke delete method" do

end end

specify "should let you invoke worker_info method" do specify "should let you invoke worker_info method" do

end end

specify "should let you invoke ask_status method" do specify "should let you invoke ask_status method" do

end end
end end

0 comments on commit 0d74b45

Please sign in to comment.