From 4b4a151cbde72ff724466921cd925f21e3274589 Mon Sep 17 00:00:00 2001 From: gnufied Date: Thu, 19 Jun 2008 13:08:44 +0530 Subject: [PATCH] repository is in flux, changes for clustering and cleaner result retrival --- ChangeLog | 38 ++++++++++++++++---------- doc/content/content.txt | 9 ++++++ lib/backgroundrb/bdrb_connection.rb | 12 +++++--- lib/backgroundrb/rails_worker_proxy.rb | 16 +++++------ server/lib/bdrb_result_storage.rb | 26 ++++++++++++++++++ server/lib/bdrb_thread_pool.rb | 26 +++++++++++------- server/lib/master_worker.rb | 32 +++++++++++----------- server/lib/meta_worker.rb | 19 +++++++++++++ test_points.org | 15 ++++++++++ 9 files changed, 141 insertions(+), 52 deletions(-) diff --git a/ChangeLog b/ChangeLog index 714d0d4..1573e57 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,30 +1,40 @@ +2008-06-19 hemant kumar + + * Make binary parser iterative and hence won't blow your stack + + * Add run_concurrent + + * Let parent sleep for a while when forking the bdrb master process. + + * Fixed issue with fork. BackgrounDRb now uses fork and exec. + 2008-02-28 hemant kumar - * fixed some meory leaks. + * fixed some meory leaks. - * Implemented cleaner API for invoking tasks in workers + * Implemented cleaner API for invoking tasks in workers - * Updated the documentation + * Updated the documentation 2008-02-25 hemant kumar - * Commited Patch by Alex which lets BackgrounDRb to have command line arguments and loading of specific environments - through command line argument. + * Commited Patch by Alex which lets BackgrounDRb to have command line arguments and loading of specific environments + through command line argument. 2008-02-14 hemant kumar - * Added TestCases for Cron Triggers, Meta Workers and stuff. We are heading towards proper code coverage with specs. + * Added TestCases for Cron Triggers, Meta Workers and stuff. We are heading towards proper code coverage with specs. + + * Added preliminary support for starting a worker on demand through scheduler. What it means is, when you have a worker which is getting + scheduled very less frequently and you don't want the worker to persist, you can ask BackgrounDRb to restart the worker on each schedule. - * Added preliminary support for starting a worker on demand through scheduler. What it means is, when you have a worker which is getting - scheduled very less frequently and you don't want the worker to persist, you can ask BackgrounDRb to restart the worker on each schedule. + * Fixed some unreported issues with writing data to socket between workers and stuff. - * Fixed some unreported issues with writing data to socket between workers and stuff. + * Fixed issues with too many open connections, because connections were not getting closed. BackgrounDRb now opens only one connection, which is + reused throughout the lifecycle of rails application. - * Fixed issues with too many open connections, because connections were not getting closed. BackgrounDRb now opens only one connection, which is - reused throughout the lifecycle of rails application. + * Fixed all outstanding issues with Cron triggers, BackgrounDRb now explicitly depends on "chronic" gem. - * Fixed all outstanding issues with Cron triggers, BackgrounDRb now explicitly depends on "chronic" gem. + * Removed Framework directory and BackgrounDRb now explicitly depends on packet gem. - * Removed Framework directory and BackgrounDRb now explicitly depends on packet gem. - diff --git a/doc/content/content.txt b/doc/content/content.txt index 272c6d0..af7fe93 100644 --- a/doc/content/content.txt +++ b/doc/content/content.txt @@ -40,6 +40,15 @@ you can install the plugin from git:
 git clone git://github.com/gnufied/backgroundrb.git 
+Also for running git version of BackgrounDRb you will need, git version of packet. + +
+git clone git://github.com/gnufied/packet.git
+cd packet;rake gem
+cd pkg; sudo gem install --local packet-0.1.6.gem
+ +If you are getting some error while building the gem, probably you need to install RSpec gem. + p(sub-title). Installation using Piston
piston import http://svn.devjavu.com/backgroundrb/trunk/ backgroundrb 
diff --git a/lib/backgroundrb/bdrb_connection.rb b/lib/backgroundrb/bdrb_connection.rb index e503c42..424f3f2 100644 --- a/lib/backgroundrb/bdrb_connection.rb +++ b/lib/backgroundrb/bdrb_connection.rb @@ -16,8 +16,8 @@ def initialize @mutex = Mutex.new end - def worker(worker_name,job_key = nil) - RailsWorkerProxy.worker(worker_name,job_key,self) + def worker(worker_name,worker_key = nil) + RailsWorkerProxy.worker(worker_name,worker_key,self) end def establish_connection @@ -96,7 +96,7 @@ def new_worker p_data p_data[:type] = :start_worker dump_object(p_data) close_connection - p_data[:job_key] + p_data[:worker_key] end def worker_info(p_data) @@ -108,6 +108,10 @@ def worker_info(p_data) bdrb_response end + def ask_result options = {} + + end + def all_worker_info p_data = { } @@ -156,7 +160,7 @@ def ask_status(p_data) end def read_from_bdrb(timeout = 3) - #@tokenizer = Packet::BinParser.new + # @tokenizer = Packet::BinParser.new begin ret_val = select([@connection],nil,nil,timeout) return nil unless ret_val diff --git a/lib/backgroundrb/rails_worker_proxy.rb b/lib/backgroundrb/rails_worker_proxy.rb index e2e4a5e..8987f22 100644 --- a/lib/backgroundrb/rails_worker_proxy.rb +++ b/lib/backgroundrb/rails_worker_proxy.rb @@ -1,12 +1,12 @@ module BackgrounDRb class RailsWorkerProxy - attr_accessor :worker_name, :worker_method, :data, :job_key,:middle_man + attr_accessor :worker_name, :worker_method, :data, :worker_key,:middle_man - def self.worker(p_worker_name,p_job_key = nil,p_middle_man = nil) + def self.worker(p_worker_name,p_worker_key = nil,p_middle_man = nil) t = new t.worker_name = p_worker_name t.middle_man = p_middle_man - t.job_key = p_job_key + t.worker_key = p_worker_key t end @@ -16,16 +16,16 @@ def method_missing(method_id,*args) flag = args[1] case worker_method when :ask_status - middle_man.ask_status(compact(:worker => worker_name,:job_key => job_key)) + middle_man.ask_status(compact(:worker => worker_name,:worker_key => worker_key)) when :worker_info - middle_man.worker_info(compact(:worker => worker_name,:job_key => job_key)) + middle_man.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) when :delete - middle_man.delete_worker(compact(:worker => worker_name, :job_key => job_key)) + middle_man.delete_worker(compact(:worker => worker_name, :worker_key => worker_key)) else if flag - middle_man.send_request(compact(:worker => worker_name,:job_key => job_key,:worker_method => worker_method,:data => data)) + middle_man.send_request(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => worker_method,:data => data)) else - middle_man.ask_work(compact(:worker => worker_name,:job_key => job_key,:worker_method => worker_method,:data => data)) + middle_man.ask_work(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => worker_method,:data => data)) end end end diff --git a/server/lib/bdrb_result_storage.rb b/server/lib/bdrb_result_storage.rb index 0b9c458..6f87b27 100644 --- a/server/lib/bdrb_result_storage.rb +++ b/server/lib/bdrb_result_storage.rb @@ -2,6 +2,32 @@ module BackgrounDRb # Will use Memcache if specified in configuration file # otherwise it will use in memory hash class ResultStorage + attr_accessor :storage_type + def initalize storage_type = nil + @cache = (storage_type == :memcache) ? memcache_instance : {} + end + def [] key + @cache[key] + end + + def []= key,value + @cache[key,value] + end + + def memcache_instance + require 'memcache' + memcache_options = { + :c_threshold => 10_000, + :compression => true, + :debug => false, + :namespace => 'backgroundrb_result_hash', + :readonly => false, + :urlencode => false + } + t_cache = MemCache.new(memcache_options) + t_cache.servers = CONFIG_FILE[:memcache].split(',') + t_cache + end end end diff --git a/server/lib/bdrb_thread_pool.rb b/server/lib/bdrb_thread_pool.rb index bab65e2..b52085c 100644 --- a/server/lib/bdrb_thread_pool.rb +++ b/server/lib/bdrb_thread_pool.rb @@ -1,27 +1,29 @@ module BackgrounDRb class WorkData attr_accessor :data,:block - def initialize(args,&block) + def initialize(args,job_key,&block) @data = args + @job_key = job_key @block = block end end class ParallelData - attr_accessor :data,:block,:response_block,:guid - def initialize(args,block,response_block) + attr_accessor :data,:block,:response_block,:job_key + def initialize(args,job_key,block,response_block) @data = args @block = block @response_block = response_block - @guid = Packet::Guid.hexdigest + @job_key = job_key end end class ResultData - attr_accessor :data,:block - def initialize args,&block + attr_accessor :data,:block,:job_key + def initialize args,job_key,&block @data = args @block = block + @job_key = job_key end end @@ -59,23 +61,27 @@ def initialize(size,logger) # assuming method is defined in rss_worker def defer(*args,&block) - @work_queue << WorkData.new(args,&block) + job_key = Thread.current[:job_key] + @work_queue << WorkData.new(args,job_key,&block) end # Same as defer, but can be used to run a block in a seperate thread and collect results back # in main thread - def fetch_parallely(args,process_block,response_block) - @work_queue << ParallelData.new(args,process_block,response_block) + def run_concurrent(args,process_block,response_block) + job_key = Thread.current[:job_key] + @work_queue << ParallelData.new(args,job_key,process_block,response_block) end def add_thread @threads << Thread.new do + Thread.current[:job_key] = nil while true task = @work_queue.pop + job_key = task.job_key @running_tasks << task block_result = run_task(task) if task.is_a? ParallelData - @result_queue << ResultData.new(block_result,&task.response_block) + @result_queue << ResultData.new(block_result,job_key,&task.response_block) end @running_tasks.pop end diff --git a/server/lib/master_worker.rb b/server/lib/master_worker.rb index 8745295..5deb051 100644 --- a/server/lib/master_worker.rb +++ b/server/lib/master_worker.rb @@ -47,9 +47,9 @@ def receive_data p_data # def pass_worker_info(t_data) - worker_name_key = gen_worker_key(t_data[:worker],t_data[:job_key]) + worker_name_key = gen_worker_key(t_data[:worker],t_data[:worker_key]) worker_instance = reactor.live_workers[worker_name_key] - info_response = { :worker => t_data[:worker],:job_key => t_data[:job_key]} + info_response = { :worker => t_data[:worker],:worker_key => t_data[:worker_key]} worker_instance ? (info_response[:status] = :running) : (info_response[:status] = :stopped) send_object(info_response) end @@ -57,8 +57,8 @@ def pass_worker_info(t_data) def all_worker_info(t_data) info_response = [] reactor.live_workers.each do |key,value| - job_key = (value.worker_key.to_s).gsub(/#{value.worker_name}_?/,"") - info_response << { :worker => value.worker_name,:job_key => job_key,:status => :running } + worker_key = (value.worker_key.to_s).gsub(/#{value.worker_name}_?/,"") + info_response << { :worker => value.worker_name,:worker_key => worker_key,:status => :running } end send_object(info_response) end @@ -73,8 +73,8 @@ def query_all_worker_status(p_data) # it could be a good idea to remove it here itself. def delete_drb_worker(t_data) worker_name = t_data[:worker] - job_key = t_data[:job_key] - worker_name_key = gen_worker_key(worker_name,job_key) + worker_key = t_data[:worker_key] + worker_name_key = gen_worker_key(worker_name,worker_key) begin worker_instance = reactor.live_workers[worker_name_key] Process.kill('TERM',worker_instance.pid) @@ -92,7 +92,7 @@ def start_worker_request(p_data) def process_work(t_data) worker_name = t_data[:worker] - worker_name_key = gen_worker_key(worker_name,t_data[:job_key]) + worker_name_key = gen_worker_key(worker_name,t_data[:worker_key]) t_data.delete(:worker) t_data.delete(:type) begin @@ -109,15 +109,15 @@ def process_work(t_data) def process_status(t_data) worker_name = t_data[:worker] - job_key = t_data[:job_key] - worker_name_key = gen_worker_key(worker_name,job_key) + worker_key = t_data[:worker_key] + worker_name_key = gen_worker_key(worker_name,worker_key) status_data = reactor.result_hash[worker_name_key.to_sym] send_object(status_data) end def process_request(t_data) worker_name = t_data[:worker] - worker_name_key = gen_worker_key(worker_name,t_data[:job_key]) + worker_name_key = gen_worker_key(worker_name,t_data[:worker_key]) t_data.delete(:worker) t_data.delete(:type) begin @@ -166,9 +166,9 @@ def initialize end end - def gen_worker_key(worker_name,job_key = nil) - return worker_name if job_key.nil? - return "#{worker_name}_#{job_key}".to_sym + def gen_worker_key(worker_name,worker_key = nil) + return worker_name if worker_key.nil? + return "#{worker_name}_#{worker_key}".to_sym end @@ -230,9 +230,9 @@ def reload_workers def load_and_invoke(worker_name,p_method,data) begin require worker_name.to_s - job_key = Packet::Guid.hexdigest - @reactor.start_worker(:worker => worker_name,:job_key => job_key) - worker_name_key = gen_worker_key(worker_name,job_key) + worker_key = Packet::Guid.hexdigest + @reactor.start_worker(:worker => worker_name,:worker_key => worker_key) + worker_name_key = gen_worker_key(worker_name,worker_key) data_request = {:data => { :worker_method => p_method,:data => data[:data]}, :type => :request, :result => false } diff --git a/server/lib/meta_worker.rb b/server/lib/meta_worker.rb index 8f8a69c..57f5e6f 100644 --- a/server/lib/meta_worker.rb +++ b/server/lib/meta_worker.rb @@ -110,8 +110,11 @@ def self.reload_on_schedule(flag = nil) def worker_init @config_file = BackgrounDRb::Config.read_config("#{RAILS_HOME}/config/backgroundrb.yml") log_flag = @config_file[:backgroundrb][:debug_log].nil? ? true : @config_file[:backgroundrb][:debug_load_rails_env] + # stores the job key of currently running job + Thread.current[:job_key] = nil @logger = PacketLogger.new(self,log_flag) @thread_pool = ThreadPool.new(pool_size || 20,@logger) + @result_queue = Queue.new if(worker_options && worker_options[:schedule] && no_auto_load) load_schedule_from_args @@ -127,6 +130,8 @@ def worker_init @logger.info "Schedules for worker loaded" end + def job_key; Thread.current[:job_key]; end + # loads workers schedule from options supplied from rails # a user may pass trigger arguments to dynamically define the schedule def load_schedule_from_args @@ -163,11 +168,15 @@ def process_request(p_data) end called_method_arity = self.method(user_input[:worker_method]).arity result = nil + + Thread.current[:job_key] = user_input[:job_key] + if called_method_arity != 0 result = self.send(user_input[:worker_method],user_input[:data]) else result = self.send(user_input[:worker_method]) end + if p_data[:result] result = "dummy_result" unless result send_response(p_data,result) if can_dump?(result) @@ -238,6 +247,15 @@ def send_response input,output end end + def cache_set key,value + t_result = ResultData.new(value,key) + @result_queue.push(t_result) + end + + def cache_get key + # should fetch data from real source. + end + def unbind; end def connection_completed; end @@ -246,6 +264,7 @@ def check_for_thread_responses 10.times do break if thread_pool.result_empty? result_object = thread_pool.result_pop + Thread.current[:job_key] = result_object.job_key (result_object.block).call(result_object.data) end end diff --git a/test_points.org b/test_points.org index 1298a55..bcdd664 100644 --- a/test_points.org +++ b/test_points.org @@ -11,6 +11,21 @@ ** sending of rails AR objects is working ** sending of rails AR objects with plugin is working ** Environment is getting loaded properly through config file and command line argument +** People are unable to call ActionController methods + +* Problem with new experimental API +** there is an issue if user invokes multiple tasks in thread pool directly from one of the worker + under current settings they are going to end up with same job key. + +** Also, new_worker can't have same method invocation conventions because it accepts more parameters. + +* Ideas +** Okay, use: + cache[job_key] = result + cache[job_key] => + will return the result. worker_name, worker_key shall be taken in to account. + +