Permalink
Browse files

defer no longer takes blocks

  • Loading branch information...
1 parent a71c0ca commit ce8e3478bedda55187973b6c5ef5927d33578a16 @gnufied committed Jul 2, 2008
@@ -115,7 +115,14 @@ def new_worker options = {}
raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded
end
- def choose_server
+ def choose_server(host_info = nil)
+ if host_info
+ return @backend_connections if host_info == :all
+ conn = @backend_connections.detect { |x| x.server_info == host_info }
+ raise NoServerAvailable.new("BackgrounDRb server is not found running on #{host_info}") unless conn
+ return conn
+ end
+
if @round_robin.empty?
@round_robin = (0...@backend_connections.length).to_a
end
@@ -11,10 +11,13 @@ def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil)
def method_missing(method_id,*args)
worker_method = method_id
- data = args
+ arguments = *args
+
result = nil
- connection = middle_man.choose_server
+
+ connection = (Hash === arguments ) ? middle_man.choose_server(arguments[:host]) : middle_man.choose_server
@tried_connections << connection.server_info
+
begin
result = invoke_on_connection(connection,worker_method,data)
rescue BdrbConnError => e
@@ -1,42 +1,23 @@
module BackgrounDRb
class WorkData
- attr_accessor :data,:block,:job_key
- def initialize(args,job_key,&block)
- @data = args
- @job_key = job_key
- @block = block
- end
- end
-
- class ParallelData
- attr_accessor :data,:block,:response_block,:job_key
- def initialize(args,job_key,block,response_block)
- @data = args
- @block = block
- @response_block = response_block
- @job_key = job_key
- end
- end
-
- class ResultData
- attr_accessor :data,:block,:job_key
- def initialize args,job_key,&block
- @data = args
- @block = block
+ attr_accessor :args,:block,:job_method)
+ def initialize(args,job_key,job_method)
+ @args = args
@job_key = job_key
+ @job_method = job_method
end
end
class ThreadPool
attr_accessor :size,:threads,:work_queue,:logger
- attr_accessor :result_queue
- def initialize(size,logger)
+ attr_accessor :result_queue,:master
+
+ def initialize(master,size,logger)
+ @master = master
@logger = logger
@size = size
@threads = []
@work_queue = Queue.new
- @running_tasks = Queue.new
- @result_queue = Queue.new
@size.times { add_thread }
end
@@ -60,16 +41,9 @@ def initialize(size,logger)
# MiddleMan.ask_work(:worker => :rss_worker, :worker_method => :fetch_url, :data => "www.example.com")
# assuming method is defined in rss_worker
- def defer(*args,&block)
+ def defer(method_name,args = nil)
job_key = Thread.current[:job_key]
- @work_queue << WorkData.new(args,job_key,&block)
- end
-
- # Same as defer, but can be used to run a block in a seperate thread and collect results back
- # in main thread
- def run_concurrent(args,process_block,response_block)
- job_key = Thread.current[:job_key]
- @work_queue << ParallelData.new(args,job_key,process_block,response_block)
+ @work_queue << WorkData.new(args,job_key,method_name)
end
def add_thread
@@ -78,32 +52,21 @@ def add_thread
while true
task = @work_queue.pop
Thread.current[:job_key] = task.job_key
- @running_tasks << task
block_result = run_task(task)
- @running_tasks.pop
end
end
end
- def result_empty?
- return true if @result_queue.empty?
- return false
- end
-
- def result_pop
- @result_queue.pop
- end
-
def run_task task
- block_arity = task.block.arity
+ block_arity = master.method(task.job_method).arity
begin
ActiveRecord::Base.verify_active_connections!
- t_data = task.data
+ t_data = task.args
result = nil
if block_arity != 0
- result = t_data.is_a?(Array) ? (task.block.call(*t_data)) : (task.block.call(t_data))
+ result = master.send(task.job_method,task.args)
else
- result = task.block.call
+ result = master.send(task.job_method)
end
return result
rescue
@@ -112,15 +75,6 @@ def run_task task
return nil
end
end
+ end #end of class ThreadPool
+end # end of module BackgrounDRb
- # method ensures exclusive run of deferred tasks for 2 seconds, so as they do get a chance to run.
- def exclusive_run
- if @running_tasks.empty? && @work_queue.empty?
- return
- else
- sleep(0.05)
- return
- end
- end
- end
-end
@@ -114,7 +114,7 @@ def worker_init
# stores the job key of currently running job
Thread.current[:job_key] = nil
@logger = PacketLogger.new(self,log_flag)
- @thread_pool = ThreadPool.new(pool_size || 20,@logger)
+ @thread_pool = ThreadPool.new(self,pool_size || 20,@logger)
@cache = ResultStorage.new(worker_name,worker_options[:worker_key],BDRB_CONFIG[:backgroundrb][:result_storage])
if(worker_options && worker_options[:schedule] && no_auto_load)

0 comments on commit ce8e347

Please sign in to comment.