Skip to content

Commit

Permalink
repository is in flux, changes for clustering and cleaner result retr…
Browse files Browse the repository at this point in the history
…ival
  • Loading branch information
gnufied committed Jun 19, 2008
1 parent 03f89e2 commit 4b4a151
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 52 deletions.
38 changes: 24 additions & 14 deletions ChangeLog
@@ -1,30 +1,40 @@
2008-06-19 hemant kumar <hemant@shire.com>

* Make binary parser iterative and hence won't blow your stack

* Add run_concurrent

* Let parent sleep for a while when forking the bdrb master process.

* Fixed issue with fork. BackgrounDRb now uses fork and exec.

2008-02-28 hemant kumar <hemant@shire>

* fixed some meory leaks.
* fixed some meory leaks.

* Implemented cleaner API for invoking tasks in workers
* Implemented cleaner API for invoking tasks in workers

* Updated the documentation
* Updated the documentation

2008-02-25 hemant kumar <hemant@shire>

* Commited Patch by Alex which lets BackgrounDRb to have command line arguments and loading of specific environments
through command line argument.
* Commited Patch by Alex which lets BackgrounDRb to have command line arguments and loading of specific environments
through command line argument.

2008-02-14 hemant kumar <hemant@shire>

* Added TestCases for Cron Triggers, Meta Workers and stuff. We are heading towards proper code coverage with specs.
* Added TestCases for Cron Triggers, Meta Workers and stuff. We are heading towards proper code coverage with specs.

* Added preliminary support for starting a worker on demand through scheduler. What it means is, when you have a worker which is getting
scheduled very less frequently and you don't want the worker to persist, you can ask BackgrounDRb to restart the worker on each schedule.

* Added preliminary support for starting a worker on demand through scheduler. What it means is, when you have a worker which is getting
scheduled very less frequently and you don't want the worker to persist, you can ask BackgrounDRb to restart the worker on each schedule.
* Fixed some unreported issues with writing data to socket between workers and stuff.

* Fixed some unreported issues with writing data to socket between workers and stuff.
* Fixed issues with too many open connections, because connections were not getting closed. BackgrounDRb now opens only one connection, which is
reused throughout the lifecycle of rails application.

* Fixed issues with too many open connections, because connections were not getting closed. BackgrounDRb now opens only one connection, which is
reused throughout the lifecycle of rails application.
* Fixed all outstanding issues with Cron triggers, BackgrounDRb now explicitly depends on "chronic" gem.

* Fixed all outstanding issues with Cron triggers, BackgrounDRb now explicitly depends on "chronic" gem.
* Removed Framework directory and BackgrounDRb now explicitly depends on packet gem.

* Removed Framework directory and BackgrounDRb now explicitly depends on packet gem.


9 changes: 9 additions & 0 deletions doc/content/content.txt
Expand Up @@ -40,6 +40,15 @@ you can install the plugin from git:
<pre class="multiline">
git clone git://github.com/gnufied/backgroundrb.git </pre>

Also for running git version of BackgrounDRb you will need, git version of packet.

<pre class="multiline">
git clone git://github.com/gnufied/packet.git
cd packet;rake gem
cd pkg; sudo gem install --local packet-0.1.6.gem</pre>

If you are getting some error while building the gem, probably you need to install RSpec gem.

p(sub-title). Installation using Piston

<pre class="boxed">piston import http://svn.devjavu.com/backgroundrb/trunk/ backgroundrb </pre>
Expand Down
12 changes: 8 additions & 4 deletions lib/backgroundrb/bdrb_connection.rb
Expand Up @@ -16,8 +16,8 @@ def initialize
@mutex = Mutex.new
end

def worker(worker_name,job_key = nil)
RailsWorkerProxy.worker(worker_name,job_key,self)
def worker(worker_name,worker_key = nil)
RailsWorkerProxy.worker(worker_name,worker_key,self)
end

def establish_connection
Expand Down Expand Up @@ -96,7 +96,7 @@ def new_worker p_data
p_data[:type] = :start_worker
dump_object(p_data)
close_connection
p_data[:job_key]
p_data[:worker_key]
end

def worker_info(p_data)
Expand All @@ -108,6 +108,10 @@ def worker_info(p_data)
bdrb_response
end

def ask_result options = {}

end


def all_worker_info
p_data = { }
Expand Down Expand Up @@ -156,7 +160,7 @@ def ask_status(p_data)
end

def read_from_bdrb(timeout = 3)
#@tokenizer = Packet::BinParser.new
# @tokenizer = Packet::BinParser.new
begin
ret_val = select([@connection],nil,nil,timeout)
return nil unless ret_val
Expand Down
16 changes: 8 additions & 8 deletions lib/backgroundrb/rails_worker_proxy.rb
@@ -1,12 +1,12 @@
module BackgrounDRb
class RailsWorkerProxy
attr_accessor :worker_name, :worker_method, :data, :job_key,:middle_man
attr_accessor :worker_name, :worker_method, :data, :worker_key,:middle_man

def self.worker(p_worker_name,p_job_key = nil,p_middle_man = nil)
def self.worker(p_worker_name,p_worker_key = nil,p_middle_man = nil)
t = new
t.worker_name = p_worker_name
t.middle_man = p_middle_man
t.job_key = p_job_key
t.worker_key = p_worker_key
t
end

Expand All @@ -16,16 +16,16 @@ def method_missing(method_id,*args)
flag = args[1]
case worker_method
when :ask_status
middle_man.ask_status(compact(:worker => worker_name,:job_key => job_key))
middle_man.ask_status(compact(:worker => worker_name,:worker_key => worker_key))
when :worker_info
middle_man.worker_info(compact(:worker => worker_name,:job_key => job_key))
middle_man.worker_info(compact(:worker => worker_name,:worker_key => worker_key))
when :delete
middle_man.delete_worker(compact(:worker => worker_name, :job_key => job_key))
middle_man.delete_worker(compact(:worker => worker_name, :worker_key => worker_key))
else
if flag
middle_man.send_request(compact(:worker => worker_name,:job_key => job_key,:worker_method => worker_method,:data => data))
middle_man.send_request(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => worker_method,:data => data))
else
middle_man.ask_work(compact(:worker => worker_name,:job_key => job_key,:worker_method => worker_method,:data => data))
middle_man.ask_work(compact(:worker => worker_name,:worker_key => worker_key,:worker_method => worker_method,:data => data))
end
end
end
Expand Down
26 changes: 26 additions & 0 deletions server/lib/bdrb_result_storage.rb
Expand Up @@ -2,6 +2,32 @@ module BackgrounDRb
# Will use Memcache if specified in configuration file
# otherwise it will use in memory hash
class ResultStorage
attr_accessor :storage_type
def initalize storage_type = nil
@cache = (storage_type == :memcache) ? memcache_instance : {}
end

def [] key
@cache[key]
end

def []= key,value
@cache[key,value]
end

def memcache_instance
require 'memcache'
memcache_options = {
:c_threshold => 10_000,
:compression => true,
:debug => false,
:namespace => 'backgroundrb_result_hash',
:readonly => false,
:urlencode => false
}
t_cache = MemCache.new(memcache_options)
t_cache.servers = CONFIG_FILE[:memcache].split(',')
t_cache
end
end
end
26 changes: 16 additions & 10 deletions server/lib/bdrb_thread_pool.rb
@@ -1,27 +1,29 @@
module BackgrounDRb
class WorkData
attr_accessor :data,:block
def initialize(args,&block)
def initialize(args,job_key,&block)
@data = args
@job_key = job_key
@block = block
end
end

class ParallelData
attr_accessor :data,:block,:response_block,:guid
def initialize(args,block,response_block)
attr_accessor :data,:block,:response_block,:job_key
def initialize(args,job_key,block,response_block)
@data = args
@block = block
@response_block = response_block
@guid = Packet::Guid.hexdigest
@job_key = job_key
end
end

class ResultData
attr_accessor :data,:block
def initialize args,&block
attr_accessor :data,:block,:job_key
def initialize args,job_key,&block
@data = args
@block = block
@job_key = job_key
end
end

Expand Down Expand Up @@ -59,23 +61,27 @@ def initialize(size,logger)
# assuming method is defined in rss_worker

def defer(*args,&block)
@work_queue << WorkData.new(args,&block)
job_key = Thread.current[:job_key]
@work_queue << WorkData.new(args,job_key,&block)
end

# Same as defer, but can be used to run a block in a seperate thread and collect results back
# in main thread
def fetch_parallely(args,process_block,response_block)
@work_queue << ParallelData.new(args,process_block,response_block)
def run_concurrent(args,process_block,response_block)
job_key = Thread.current[:job_key]
@work_queue << ParallelData.new(args,job_key,process_block,response_block)
end

def add_thread
@threads << Thread.new do
Thread.current[:job_key] = nil
while true
task = @work_queue.pop
job_key = task.job_key
@running_tasks << task
block_result = run_task(task)
if task.is_a? ParallelData
@result_queue << ResultData.new(block_result,&task.response_block)
@result_queue << ResultData.new(block_result,job_key,&task.response_block)
end
@running_tasks.pop
end
Expand Down
32 changes: 16 additions & 16 deletions server/lib/master_worker.rb
Expand Up @@ -47,18 +47,18 @@ def receive_data p_data

#
def pass_worker_info(t_data)
worker_name_key = gen_worker_key(t_data[:worker],t_data[:job_key])
worker_name_key = gen_worker_key(t_data[:worker],t_data[:worker_key])
worker_instance = reactor.live_workers[worker_name_key]
info_response = { :worker => t_data[:worker],:job_key => t_data[:job_key]}
info_response = { :worker => t_data[:worker],:worker_key => t_data[:worker_key]}
worker_instance ? (info_response[:status] = :running) : (info_response[:status] = :stopped)
send_object(info_response)
end

def all_worker_info(t_data)
info_response = []
reactor.live_workers.each do |key,value|
job_key = (value.worker_key.to_s).gsub(/#{value.worker_name}_?/,"")
info_response << { :worker => value.worker_name,:job_key => job_key,:status => :running }
worker_key = (value.worker_key.to_s).gsub(/#{value.worker_name}_?/,"")
info_response << { :worker => value.worker_name,:worker_key => worker_key,:status => :running }
end
send_object(info_response)
end
Expand All @@ -73,8 +73,8 @@ def query_all_worker_status(p_data)
# it could be a good idea to remove it here itself.
def delete_drb_worker(t_data)
worker_name = t_data[:worker]
job_key = t_data[:job_key]
worker_name_key = gen_worker_key(worker_name,job_key)
worker_key = t_data[:worker_key]
worker_name_key = gen_worker_key(worker_name,worker_key)
begin
worker_instance = reactor.live_workers[worker_name_key]
Process.kill('TERM',worker_instance.pid)
Expand All @@ -92,7 +92,7 @@ def start_worker_request(p_data)

def process_work(t_data)
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:job_key])
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
t_data.delete(:worker)
t_data.delete(:type)
begin
Expand All @@ -109,15 +109,15 @@ def process_work(t_data)

def process_status(t_data)
worker_name = t_data[:worker]
job_key = t_data[:job_key]
worker_name_key = gen_worker_key(worker_name,job_key)
worker_key = t_data[:worker_key]
worker_name_key = gen_worker_key(worker_name,worker_key)
status_data = reactor.result_hash[worker_name_key.to_sym]
send_object(status_data)
end

def process_request(t_data)
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:job_key])
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
t_data.delete(:worker)
t_data.delete(:type)
begin
Expand Down Expand Up @@ -166,9 +166,9 @@ def initialize
end
end

def gen_worker_key(worker_name,job_key = nil)
return worker_name if job_key.nil?
return "#{worker_name}_#{job_key}".to_sym
def gen_worker_key(worker_name,worker_key = nil)
return worker_name if worker_key.nil?
return "#{worker_name}_#{worker_key}".to_sym
end


Expand Down Expand Up @@ -230,9 +230,9 @@ def reload_workers
def load_and_invoke(worker_name,p_method,data)
begin
require worker_name.to_s
job_key = Packet::Guid.hexdigest
@reactor.start_worker(:worker => worker_name,:job_key => job_key)
worker_name_key = gen_worker_key(worker_name,job_key)
worker_key = Packet::Guid.hexdigest
@reactor.start_worker(:worker => worker_name,:worker_key => worker_key)
worker_name_key = gen_worker_key(worker_name,worker_key)
data_request = {:data => { :worker_method => p_method,:data => data[:data]},
:type => :request, :result => false
}
Expand Down
19 changes: 19 additions & 0 deletions server/lib/meta_worker.rb
Expand Up @@ -110,8 +110,11 @@ def self.reload_on_schedule(flag = nil)
def worker_init
@config_file = BackgrounDRb::Config.read_config("#{RAILS_HOME}/config/backgroundrb.yml")
log_flag = @config_file[:backgroundrb][:debug_log].nil? ? true : @config_file[:backgroundrb][:debug_load_rails_env]
# stores the job key of currently running job
Thread.current[:job_key] = nil
@logger = PacketLogger.new(self,log_flag)
@thread_pool = ThreadPool.new(pool_size || 20,@logger)
@result_queue = Queue.new

if(worker_options && worker_options[:schedule] && no_auto_load)
load_schedule_from_args
Expand All @@ -127,6 +130,8 @@ def worker_init
@logger.info "Schedules for worker loaded"
end

def job_key; Thread.current[:job_key]; end

# loads workers schedule from options supplied from rails
# a user may pass trigger arguments to dynamically define the schedule
def load_schedule_from_args
Expand Down Expand Up @@ -163,11 +168,15 @@ def process_request(p_data)
end
called_method_arity = self.method(user_input[:worker_method]).arity
result = nil

Thread.current[:job_key] = user_input[:job_key]

if called_method_arity != 0
result = self.send(user_input[:worker_method],user_input[:data])
else
result = self.send(user_input[:worker_method])
end

if p_data[:result]
result = "dummy_result" unless result
send_response(p_data,result) if can_dump?(result)
Expand Down Expand Up @@ -238,6 +247,15 @@ def send_response input,output
end
end

def cache_set key,value
t_result = ResultData.new(value,key)
@result_queue.push(t_result)
end

def cache_get key
# should fetch data from real source.
end

def unbind; end

def connection_completed; end
Expand All @@ -246,6 +264,7 @@ def check_for_thread_responses
10.times do
break if thread_pool.result_empty?
result_object = thread_pool.result_pop
Thread.current[:job_key] = result_object.job_key
(result_object.block).call(result_object.data)
end
end
Expand Down

0 comments on commit 4b4a151

Please sign in to comment.