Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge commit 'origin/master'

  • Loading branch information...
commit f391764189515d256cf77861e04251b702e48b3a 2 parents 3ab89f7 + 8e7af36
@gnufied authored
View
12 Rakefile
@@ -4,7 +4,6 @@ require 'rake/testtask'
require 'rake/rdoctask'
require 'spec/rake/spectask'
require 'rake/contrib/sshpublisher'
-require "darkfish-rdoc"
desc 'Default: run unit tests.'
task :default => :test
@@ -32,21 +31,14 @@ desc 'Generate documentation for the backgroundrb plugin.'
Rake::RDocTask.new(:rdoc) do |rdoc|
rdoc.rdoc_dir = 'doc/output/manual'
rdoc.title = 'Backgroundrb'
- #rdoc.options << '--line-numbers' << '--inline-source'
+ rdoc.options << '--line-numbers' << '--inline-source'
rdoc.rdoc_files.include('README')
rdoc.rdoc_files.include('LICENSE')
rdoc.rdoc_files.include('lib/*.rb')
rdoc.rdoc_files.include('lib/backgroundrb/*.rb')
rdoc.rdoc_files.include('server/*.rb')
rdoc.rdoc_files.include('server/lib/*.rb')
- #rdoc.template = 'jamis'
- rdoc.options += [
- '-w', '4',
- '-SHN',
- '-f', 'darkfish', # This bit
- '-m', 'README',
- ]
-
+ rdoc.template = 'jamis'
end
module Rake
View
2  doc/content/content.txt
@@ -41,7 +41,7 @@ p(sub-title). Installation using Piston
%(entry-title)<a name="configuration"> Configuration </a>%
-After getting the plugin, you must copy it into your vendor/rails and
+After getting the plugin, you must copy it into your vendor/plugins and
then configure it for use. _BackgrounDRb_ comes with a rake task for
automating plugin configuration. Before running rake task, remove if
any old @backgroundrb@ or @load_worker_env.rb@ script is there in script folder of your rails
View
2  examples/backgroundrb.yml
@@ -10,7 +10,7 @@
:persistent_disabled: false # turn this off if your application doesn't use backgroundrb's persistent/enqueued tasks system
:persistent_delay: 10 # the time (seconds) between each time backgroundrb checks the database for enqueued tasks
-:memcache: "10.0.0.1:11211,10.0.0.2:11211" #=> location of mecache clusters seperated by comma
+:memcache: "10.0.0.1:11211,10.0.0.2:11211" #=> location of memcache clusters separated by comma
# following section is totally optional, and only useful if you are trying to cluster of backgroundrb server
# if you do not specify this section backgroundrb will assume that, from rails you are connecting to the
View
1  lib/backgroundrb.rb
@@ -17,6 +17,7 @@
require "backgroundrb/bdrb_connection"
require "backgroundrb/bdrb_cluster_connection"
require "backgroundrb/bdrb_start_stop"
+require "backgroundrb/bdrb_result"
MiddleMan = BackgrounDRb::ClusterConnection.new
View
4 lib/backgroundrb/bdrb_cluster_connection.rb
@@ -129,9 +129,11 @@ def all_worker_info
def new_worker(options = {})
update_stats
succeeded = false
+ result = nil
+
@backend_connections.each do |connection|
begin
- connection.new_worker(options)
+ result = connection.new_worker(options)
succeeded = true
rescue BdrbConnError; end
end
View
8 lib/backgroundrb/bdrb_conn_error.rb
@@ -18,4 +18,12 @@ def initialize(message)
# raised, when said task was submitted without a job key, whereas
# nature of the task requires a job key
class NoJobKey < RuntimeError; end
+
+ # raised if worker throws some error
+ class RemoteWorkerError < RuntimeError
+ attr_accessor :message
+ def initialize message
+ @message = message
+ end
+ end
end
View
5 lib/backgroundrb/bdrb_connection.rb
@@ -84,7 +84,10 @@ def close_connection
def ask_work p_data
p_data[:type] = :async_invoke
dump_object(p_data)
+ bdrb_response = nil
+ @mutex.synchronize { bdrb_response = read_from_bdrb() }
close_connection
+ bdrb_response
end
def new_worker p_data
@@ -170,7 +173,7 @@ def send_request(p_data)
bdrb_response = nil
@mutex.synchronize { bdrb_response = read_from_bdrb(nil) }
close_connection
- bdrb_response ? bdrb_response[:data] : nil
+ bdrb_response
end
end
end
View
5 lib/backgroundrb/bdrb_job_queue.rb
@@ -5,6 +5,7 @@ class BdrbJobQueue < ActiveRecord::Base
# find next task from the table
def self.find_next(worker_name,worker_key = nil)
returned_job = nil
+ ActiveRecord::Base.verify_active_connections!
transaction do
unless worker_key
#use ruby time stamps for time calculations as db might have different times than what is calculated by ruby/rails
@@ -25,6 +26,7 @@ def self.find_next(worker_name,worker_key = nil)
# release a job and mark it to be unfinished and free.
# useful, if inside a worker, processing of this job failed and you want it to process later
def release_job
+ ActiveRecord::Base.verify_active_connections!
self.class.transaction do
self.taken = 0
self.started_at = nil
@@ -34,6 +36,7 @@ def release_job
# insert a new job for processing. jobs added will be automatically picked by the appropriate worker
def self.insert_job(options = { })
+ ActiveRecord::Base.verify_active_connections!
transaction do
options.merge!(:submitted_at => Time.now.utc,:finished => 0,:taken => 0)
t_job = new(options)
@@ -43,6 +46,7 @@ def self.insert_job(options = { })
# remove a job from table
def self.remove_job(options = { })
+ ActiveRecord::Base.verify_active_connections!
transaction do
t_job_id = find(:first, :conditions => options.merge(:finished => 0,:taken => 0),:lock => true)
delete(t_job_id)
@@ -51,6 +55,7 @@ def self.remove_job(options = { })
# Mark a job as finished
def finish!
+ ActiveRecord::Base.verify_active_connections!
self.class.transaction do
self.finished = 1
self.finished_at = Time.now.utc
View
19 lib/backgroundrb/bdrb_result.rb
@@ -0,0 +1,19 @@
+module BackgrounDRb
+ class Result
+ def initialize results
+ @results = resuls
+ end
+
+ def async_response?
+ !(@results[:result] == true)
+ end
+
+ def sync_response?
+ (@results[:result] == true)
+ end
+
+ def error?
+ !(@results[:result_flag] == "ok")
+ end
+ end
+end
View
26 lib/backgroundrb/bdrb_start_stop.rb
@@ -43,14 +43,34 @@ def start
op.write(Process.pid().to_s)
op.close
if BDRB_CONFIG[:backgroundrb][:log].nil? or BDRB_CONFIG[:backgroundrb][:log] != 'foreground'
- log_file = File.open(SERVER_LOGGER,"w+")
- [STDIN, STDOUT, STDERR].each {|desc| desc.reopen(log_file)}
+ redirect_io(SERVER_LOGGER)
end
-
BackgrounDRb::MasterProxy.new()
end
end
+ # Free file descriptors and
+ # point them somewhere sensible
+ # STDOUT/STDERR should go to a logfile
+ def redirect_io(logfile_name)
+ begin; STDIN.reopen "/dev/null"; rescue ::Exception; end
+
+ if logfile_name
+ begin
+ STDOUT.reopen logfile_name, "a"
+ STDOUT.sync = true
+ rescue ::Exception
+ begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end
+ end
+ else
+ begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end
+ end
+
+ begin; STDERR.reopen STDOUT; rescue ::Exception; end
+ STDERR.sync = true
+ end
+
+
def stop
pid_files = Dir["#{RAILS_HOME}/tmp/pids/backgroundrb_*.pid"]
pid_files.each { |x| kill_process(x) }
View
27 lib/backgroundrb/rails_worker_proxy.rb
@@ -16,6 +16,10 @@ def method_missing(method_id,*args)
arguments = args.first
arg,job_key,host_info,scheduled_at = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at)
+
+ # allow both arg and args
+ arg ||= arguments[:args]
+
new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc
if worker_method =~ /^async_(\w+)/
@@ -76,8 +80,27 @@ def run_method host_info,method_name,worker_options = {}
retry
end
end
- return nil if method_name == :ask_work
- return_result(result)
+ #return nil if method_name == :ask_work
+ process_result(return_result(result))
+ end
+
+ def process_result t_result
+ case t_result
+ when Hash
+ if(t_result[:result] == true && t_result[:type] = :response)
+ if(t_result[:result_flag] == "ok")
+ return t_result[:data]
+ else
+ raise RemoteWorkerError.new("Error while executing worker method")
+ end
+ elsif(t_result[:result_flag] == "ok")
+ "ok"
+ elsif(t_result[:result_flag] == "error")
+ raise RemoteWorkerError.new("Error while executing worker method")
+ end
+ when Array
+ t_result
+ end
end
# choose a backgroundrb server connection and invoke worker method on it.
View
48 release_notes.org
@@ -0,0 +1,48 @@
+Hi,
+
+BackgrounDRb 1.2 is being unleashed.
+
+* New features:
+
+** Exceptions/errors are now popped out at the earliest moment
+ in the client side itself. For example:
+
+>> MiddleMan.worker(:foo_worker).async_bar(:args => {:age => 10})
+BackgrounDRb::RemoteWorkerError: BackgrounDRb::RemoteWorkerError
+
+ Above exception is thrown because remote worker doesn't have method
+ "bar" defined on it.
+
+Similarly:
+
+>> MiddleMan.worker(:foo_worker).checksum(:args => {:age => "lolz"})
+BackgrounDRb::RemoteWorkerError: BackgrounDRb::RemoteWorkerError
+
+ Above exception is thrown because remote worker's checksum method
+ expects an integer as an argument.
+
+ For asynchronous method calls, BackgrounDRb doesn't check if method
+ ran successfully, it only checks existence of methods on remote
+ worker. For sync method calls it checks if method ran successfully
+ or not.
+
+** Its possible to have per worker configuration options now.
+
+* Bug Fixes
+
+** Much better error/exception handling. Rogue worker methods shouldn't
+ crash the worker now. All the unhandled exceptions and dispatch
+ errors can be found in debug log file.
+
+** Fixes for postgres db with persistent job queues.
+
+** Switched to lightweight Queue implementation for tasks enqueued
+ to thread pool.
+
+** Fixes for database dropped connections while running tasks from
+ persistent queues.
+
+** Fixes for newer Rails versions.
+
+**
+
View
65 server/lib/bdrb_thread_pool.rb
@@ -1,4 +1,7 @@
module BackgrounDRb
+
+ class InterruptedException < RuntimeError ; end
+
class WorkData
attr_accessor :args,:block,:job_method,:persistent_job_id,:job_key
def initialize(args,job_key,job_method,persistent_job_id)
@@ -18,7 +21,9 @@ def initialize(master,size,logger)
@logger = logger
@size = size
@threads = []
- @work_queue = Queue.new
+ @work_queue = []
+ @mutex = Monitor.new
+ @cv = @mutex.new_cond
@size.times { add_thread }
end
@@ -43,9 +48,13 @@ def initialize(master,size,logger)
# assuming method is defined in rss_worker
def defer(method_name,args = nil)
- job_key = Thread.current[:job_key]
- persistent_job_id = Thread.current[:persistent_job_id]
- @work_queue << WorkData.new(args,job_key,method_name,persistent_job_id)
+ @mutex.synchronize do
+ job_key = Thread.current[:job_key]
+ persistent_job_id = Thread.current[:persistent_job_id]
+ @cv.wait_while { @work_queue.size >= size }
+ @work_queue.push(WorkData.new(args,job_key,method_name,persistent_job_id))
+ @cv.broadcast
+ end
end
# Start worker threads
@@ -54,10 +63,22 @@ def add_thread
Thread.current[:job_key] = nil
Thread.current[:persistent_job_id] = nil
while true
- task = @work_queue.pop
- Thread.current[:job_key] = task.job_key
- Thread.current[:persistent_job_id] = task.persistent_job_id
- block_result = run_task(task)
+ begin
+ task = nil
+ @mutex.synchronize do
+ @cv.wait_while { @work_queue.size == 0 }
+ task = @work_queue.pop
+ @cv.broadcast
+ end
+ if task
+ Thread.current[:job_key] = task.job_key
+ Thread.current[:persistent_job_id] = task.persistent_job_id
+ block_result = run_task(task)
+ end
+ rescue BackgrounDRb::InterruptedException
+ STDERR.puts("BackgrounDRb thread interrupted: #{Thread.current.inspect}")
+ STDERR.flush
+ end
end
end
end
@@ -66,7 +87,7 @@ def add_thread
def run_task task
block_arity = master.method(task.job_method).arity
begin
- ActiveRecord::Base.verify_active_connections!
+ check_db_connection
t_data = task.args
result = nil
if block_arity != 0
@@ -75,12 +96,32 @@ def run_task task
result = master.send(task.job_method)
end
return result
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
+ rescue BackgrounDRb::InterruptedException => e
+ # Don't log, just re-raise
+ raise e
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
return nil
end
end
+
+ def log_exception exception_object
+ STDERR.puts exception_object.to_s
+ STDERR.puts exception_object.backtrace.join("\n")
+ STDERR.flush
+ end
+
+
+ # Periodic check for lost database connections and closed connections
+ def check_db_connection
+ begin
+ ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
+ end
+ end
+
+
end #end of class ThreadPool
end # end of module BackgrounDRb
View
13 server/lib/master_proxy.rb
@@ -112,10 +112,12 @@ def load_and_invoke(worker_name,p_method,data)
t_worker.send_request(data_request)
t_worker.send_request(exit_request)
end
- rescue LoadError
+ rescue LoadError => e
puts "no such worker #{worker_name}"
- rescue MissingSourceFile
+ puts e.backtrace.join("\n")
+ rescue MissingSourceFile => e
puts "no such worker #{worker_name}"
+ puts e.backtrace.join("\n")
return
end
end
@@ -124,7 +126,12 @@ def load_rails_env
db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
run_env = ENV["RAILS_ENV"]
ActiveRecord::Base.establish_connection(db_config_file[run_env])
- ActiveRecord::Base.allow_concurrency = true
+
+ if(Object.const_defined?(:Rails) && Rails.version < "2.2.2")
+ ActiveRecord::Base.allow_concurrency = true
+ elsif(Object.const_defined?(:RAILS_GEM_VERSION) && RAILS_GEM_VERSION < "2.2.2")
+ ActiveRecord::Base.allow_concurrency = true
+ end
end
def check_for_ruby_version; RUBY_VERSION >= "1.8.5"; end
View
20 server/lib/master_worker.rb
@@ -47,9 +47,9 @@ def receive_data p_data
else; debug_logger.info("Invalid request")
end
end
- rescue Exception => e
- debug_logger.info(e)
- debug_logger.info(e.backtrace.join("\n"))
+ rescue Object => bdrb_error
+ debug_logger.info(bdrb_error)
+ debug_logger.info(bdrb_error.backtrace.join("\n"))
send_object(nil)
end
end
@@ -103,19 +103,32 @@ def start_worker_request(p_data)
def async_method_invoke(t_data)
worker_name = t_data[:worker]
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
+
+ unless worker_methods(worker_name_key).include?(t_data[:worker_method])
+ send_object(:result_flag => "error")
+ return
+ end
+
t_data.delete(:worker)
t_data.delete(:type)
begin
ask_worker(worker_name_key,:data => t_data, :type => :request, :result => false)
+ send_object(:result_flag => "ok")
rescue Packet::DisconnectError => sock_error
+ send_object(:result_flag => "error")
reactor.live_workers.delete(worker_name_key)
rescue
+ send_object(:result_flag => "error")
debug_logger.info($!.message)
debug_logger.info($!.backtrace.join("\n"))
return
end
end
+ def worker_methods worker_name_key
+ reactor.live_workers[worker_name_key].invokable_worker_methods
+ end
+
# Given a cache key, ask the worker for result stored in it.
# If you are using Memcache for result storage, this method won't be
# called at all and bdrb client library will directly fetch
@@ -155,6 +168,7 @@ def method_invoke(t_data)
# Receieve responses from workers and dispatch them back to the client
def worker_receive p_data
+ p_data[:result_flag] ||= "ok"
send_object(p_data)
end
View
209 server/lib/meta_worker.rb
@@ -108,11 +108,10 @@ def worker_init
raise "Invalid worker name" if !worker_name
Thread.abort_on_exception = true
- log_flag = BDRB_CONFIG[:backgroundrb][:debug_log].nil? ? true : BDRB_CONFIG[:backgroundrb][:debug_load_rails_env]
-
# stores the job key of currently running job
Thread.current[:job_key] = nil
- @logger = PacketLogger.new(self,log_flag)
+ initialize_logger
+
@thread_pool = ThreadPool.new(self,pool_size || 20,@logger)
t_worker_key = worker_options && worker_options[:worker_key]
@@ -125,12 +124,62 @@ def worker_init
new_load_schedule if @my_schedule
end
if respond_to?(:create)
- create_arity = method(:create).arity
- (create_arity == 0) ? create : create(worker_options[:data])
+ invoke_user_method(:create,worker_options[:data])
+ end
+ if run_persistent_jobs?
+ add_periodic_timer(persistent_delay.to_i) { check_for_enqueued_tasks }
+ end
+ write_pid_file(t_worker_key)
+ end
+
+ def write_pid_file t_worker_key
+ key = [worker_name,t_worker_key].compact.join('_')
+ pid_file = "#{RAILS_HOME}/tmp/pids/backgroundrb_#{BDRB_CONFIG[:backgroundrb][:port]}_worker_#{key}.pid"
+ op = File.open(pid_file, "w")
+ op.write(Process.pid().to_s)
+ op.close
+ end
+
+ def initialize_logger
+ log_flag = BDRB_CONFIG[:backgroundrb][:debug_log].nil? ? true : BDRB_CONFIG[:backgroundrb][:debug_load_rails_env]
+ if BDRB_CONFIG[:backgroundrb][:logging_logger].nil?
+ @logger = PacketLogger.new(self,log_flag)
+ else
+ log_config = BDRB_CONFIG[:backgroundrb][:logging_logger]
+ @logger = Logging::Logger[log_config[:name]]
+ @logger.trace = log_config[:trace]
+ @logger.additive = log_config[:additive]
+
+ log_config[:appenders].keys.each do |key|
+ appender_config = log_config[:appenders][key]
+
+ logger_options = {
+ :filename => "#{RAILS_HOME}/#{appender_config[:filename]}",
+ :age => appender_config[:age],
+ :size => appender_config[:size],
+ :keep => appender_config[:keep],
+ :safe => appender_config[:safe],
+ :layout => Logging::Layouts::Pattern.new(:pattern => appender_config[:layout_pattern])
+ }
+ appender = "Logging::Appenders::#{appender_config[:type]}".constantize.new("backgroundrb_#{key}",logger_options)
+ @logger.add_appenders(appender)
+ end
end
- return if BDRB_CONFIG[:backgroundrb][:persistent_disabled]
- delay = BDRB_CONFIG[:backgroundrb][:persistent_delay] || 5
- add_periodic_timer(delay.to_i) { check_for_enqueued_tasks }
+ end
+
+ def puts msg
+ STDOUT.puts msg
+ STDOUT.flush
+ end
+
+ # Returns the persistent job queue check delay for this worker
+ def persistent_delay
+ get_config_value(:persistent_delay, 5)
+ end
+
+ # Returns true if persistent jobs should be run for this worker.
+ def run_persistent_jobs?
+ !get_config_value(:persistent_disabled, false)
end
# return job key from thread global variable
@@ -184,29 +233,24 @@ def process_request(p_data)
user_input = p_data[:data]
if (user_input[:worker_method]).nil? or !respond_to?(user_input[:worker_method])
result = nil
- send_response(p_data,result)
+ puts "Trying to invoke invalid worker method on worker #{worker_name}"
+ send_response(p_data,result,"error")
return
end
- called_method_arity = self.method(user_input[:worker_method]).arity
result = nil
Thread.current[:job_key] = user_input[:job_key]
- begin
- if called_method_arity != 0
- result = self.send(user_input[:worker_method],user_input[:arg])
- else
- result = self.send(user_input[:worker_method])
- end
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
- end
+ result,result_flag = invoke_user_method(user_input[:worker_method],user_input[:arg])
if p_data[:result]
result = "dummy_result" if result.nil?
- send_response(p_data,result) if can_dump?(result)
+ if can_dump?(result)
+ send_response(p_data,result,result_flag)
+ else
+ send_response(p_data,"dummy_result","error")
+ end
end
end
@@ -240,24 +284,48 @@ def new_load_schedule
# send the response back to master process and hence to the client
# if there is an error while dumping the object, send "invalid_result_dump_check_log"
- def send_response input,output
+ def send_response input,output,result_flag = "ok"
input[:data] = output
input[:type] = :response
+ input[:result_flag] = result_flag
begin
send_data(input)
- rescue TypeError => e
- logger.info(e.to_s)
- logger.info(e.backtrace.join("\n"))
- input[:data] = "invalid_result_dump_check_log"
- send_data(input)
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
input[:data] = "invalid_result_dump_check_log"
+ input[:result_flag] = "error"
send_data(input)
end
end
+ def log_exception exception_object
+ STDERR.puts exception_object.to_s
+ STDERR.puts exception_object.backtrace.join("\n")
+ STDERR.flush
+ end
+
+ def invoke_user_method user_method,args
+ if self.respond_to?(user_method)
+ called_method_arity = self.method(user_method).arity
+ t_result = nil
+ begin
+ if(called_method_arity != 0)
+ t_result = self.send(user_method,args)
+ else
+ t_result = self.send(user_method)
+ end
+ [t_result,"ok"]
+ rescue Object => bdrb_error
+ puts "Error calling method #{user_method} with #{args} on worker #{worker_name}"
+ log_exception(bdrb_error)
+ [t_result,"error"]
+ end
+ else
+ puts "Trying to invoke method #{user_method} with #{args} on worker #{worker_name} failed because no such method is defined on the worker"
+ [nil,"error"]
+ end
+ end
+
# called when connection is closed
def unbind; end
@@ -265,29 +333,27 @@ def connection_completed; end
# Check for enqueued tasks and invoke appropriate methods
def check_for_enqueued_tasks
- if worker_key && !worker_key.empty?
- task = BdrbJobQueue.find_next(worker_name.to_s,worker_key.to_s)
- else
- task = BdrbJobQueue.find_next(worker_name.to_s)
- end
- return unless task
- if self.respond_to? task.worker_method
- Thread.current[:persistent_job_id] = task[:id]
- Thread.current[:job_key] = task[:job_key]
- called_method_arity = self.method(task.worker_method).arity
- args = load_data(task.args)
- begin
- if called_method_arity != 0
- self.send(task.worker_method,args)
- else
- self.send(task.worker_method)
- end
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
+ while (task = get_next_task)
+ if self.respond_to? task.worker_method
+ Thread.current[:persistent_job_id] = task[:id]
+ Thread.current[:job_key] = task[:job_key]
+ args = load_data(task.args)
+ invoke_user_method(task.worker_method,args)
+ else
+ task.release_job
end
+ # Unless configured to loop on persistent tasks, run only
+ # once, and then break
+ break unless BDRB_CONFIG[:backgroundrb][:persistent_multi]
+ end
+ end
+
+ # Get the next enqueued job
+ def get_next_task
+ if worker_key && !worker_key.empty?
+ BdrbJobQueue.find_next(worker_name.to_s,worker_key.to_s)
else
- task.release_job
+ BdrbJobQueue.find_next(worker_name.to_s)
end
end
@@ -301,12 +367,7 @@ def check_for_timer_events
time_now = Time.now.to_i
if value[:runtime] < time_now
check_db_connection
- begin
- (t_data = value[:data]) ? send(key,t_data) : send(key)
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
- end
+ invoke_user_method(key,value[:data])
t_time = value[:trigger].fire_after_time(Time.now)
value[:runtime] = t_time.to_i
end
@@ -317,18 +378,44 @@ def check_for_timer_events
def check_db_connection
begin
ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
end
end
private
+
+ # Returns the local configuration hash for this worker. Returns an
+ # empty hash if no local config exists.
+ def worker_config
+ if BDRB_CONFIG[:workers] && BDRB_CONFIG[:workers][worker_name.to_sym]
+ BDRB_CONFIG[:workers][worker_name.to_sym]
+ else
+ {}
+ end
+ end
+
+ # Returns the appropriate configuration value, based on both the
+ # global config and the per-worker configuration for this worker.
+ def get_config_value(key_sym, default)
+ if !worker_config[key_sym].nil?
+ worker_config[key_sym]
+ elsif !BDRB_CONFIG[:backgroundrb][key_sym].nil?
+ BDRB_CONFIG[:backgroundrb][key_sym]
+ else
+ default
+ end
+ end
+
def load_rails_env
db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
run_env = ENV["RAILS_ENV"]
ActiveRecord::Base.establish_connection(db_config_file[run_env])
- ActiveRecord::Base.allow_concurrency = true
+ if(Object.const_defined?(:Rails) && Rails.version < "2.2.2")
+ ActiveRecord::Base.allow_concurrency = true
+ elsif(Object.const_defined?(:RAILS_GEM_VERSION) && RAILS_GEM_VERSION < "2.2.2")
+ ActiveRecord::Base.allow_concurrency = true
+ end
end
end # end of class MetaWorker
View
2  test/client/test_worker_proxy.rb
@@ -21,7 +21,7 @@
specify "should let you invoke sync task methods" do
actual_conn = mock()
actual_conn.expects(:server_info).returns("localhost:11008")
- actual_conn.expects(:send_request).returns(20)
+ actual_conn.expects(:send_request).returns({ :data => 20, :result_flag => "ok",:result => true, :type => :response})
@cluster_conn.expects(:choose_server).returns(actual_conn)
a = @worker_proxy.hello_world(:args => "sucks")
a.should == 20
View
1  test/server/test_master_proxy.rb
@@ -24,6 +24,7 @@
@master_proxy.worker_triggers.should.not.be {}
assert @master_proxy.worker_triggers.keys.include?(:bar_worker)
assert @master_proxy.worker_triggers[:bar_worker].keys.include?(:do_job)
+
@master_proxy.worker_triggers[:bar_worker][:do_job].should.not.be { }
end
View
36 test/server/test_master_worker.rb
@@ -2,7 +2,7 @@
context "Master Worker in general should" do
- def dump_object data
+ def packet_dump data
t = Marshal.dump(data)
t.length.to_s.rjust(9,'0') + t
end
@@ -13,6 +13,7 @@ def dump_object data
class << @master_worker
attr_accessor :outgoing_data
attr_accessor :key,:live_workers,:excpetion_type
+ attr_accessor :going_to_user
def packet_classify(original_string)
word_parts = original_string.split('_')
@@ -35,6 +36,10 @@ def ask_worker key,data
end
end
+ def send_object data
+ @going_to_user = data
+ end
+
def start_worker data
@outgoing_data = data
end
@@ -45,7 +50,9 @@ def ask_for_exception type
end
class DummyLogger
- def method_missing method_id,*args; "boy"; end
+ def method_missing method_id,*args;
+ puts *args
+ end
end
logger = DummyLogger.new
@master_worker.debug_logger = logger
@@ -56,14 +63,14 @@ def method_missing method_id,*args; "boy"; end
:type=>:sync_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"
}
@master_worker.expects(:method_invoke).with(sync_request).returns(nil)
- @master_worker.receive_data(dump_object(sync_request))
+ @master_worker.receive_data(packet_dump(sync_request))
end
specify "ignore errors while recreating object" do
sync_request = {
:type=>:sync_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"
}
- foo = dump_object(sync_request)
+ foo = packet_dump(sync_request)
foo[0] = 'h'
@master_worker.expects(:method_invoke).never
@master_worker.receive_data(foo)
@@ -73,21 +80,24 @@ def method_missing method_id,*args; "boy"; end
b = {
:type=>:async_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"
}
- @master_worker.receive_data(dump_object(b))
+
+ @master_worker.expects(:worker_methods).returns(["barbar"])
+ @master_worker.receive_data(packet_dump(b))
@master_worker.outgoing_data.should == {:type=>:request, :data=>{:worker_method=>"barbar", :arg=>"boy"}, :result=>false}
+ @master_worker.going_to_user.should == { :result_flag => "ok" }
@master_worker.key.should == :foo_worker
end
specify "should route sync requests and return results" do
a = {:type=>:sync_invoke, :arg=>"boy", :worker=>:foo_worker, :worker_method=>"barbar"}
- @master_worker.receive_data(dump_object(a))
+ @master_worker.receive_data(packet_dump(a))
@master_worker.outgoing_data.should == {:type=>:request, :data=>{:worker_method=>"barbar", :arg=>"boy"}, :result=>true}
@master_worker.key.should == :foo_worker
end
specify "should route start worker requests" do
d = {:worker_key=>"boy", :type=>:start_worker, :worker=>:foo_worker}
- @master_worker.receive_data(dump_object(d))
+ @master_worker.receive_data(packet_dump(d))
@master_worker.outgoing_data.should == {:type=>:start_worker, :worker_key=>"boy", :worker=>:foo_worker}
end
@@ -95,7 +105,7 @@ def method_missing method_id,*args; "boy"; end
specify "should run delete worker requests itself" do
e = {:worker_key=>"boy", :type=>:delete_worker, :worker=>:foo_worker}
@master_worker.expects(:delete_drb_worker).returns(nil)
- @master_worker.receive_data(dump_object(e))
+ @master_worker.receive_data(packet_dump(e))
end
specify "should route worker info requests" do
@@ -106,7 +116,7 @@ def method_missing method_id,*args; "boy"; end
t_reactor.expects(:live_workers).returns(live_workers)
@master_worker.expects(:send_object).with({:worker_key=>"boy", :worker=>:foo_worker, :status=>:stopped}).returns(true)
@master_worker.expects(:reactor).returns(t_reactor)
- @master_worker.receive_data(dump_object(g))
+ @master_worker.receive_data(packet_dump(g))
end
specify "should route all_worker_info requests" do
@@ -118,12 +128,12 @@ def method_missing method_id,*args; "boy"; end
@master_worker.expects(:send_object).returns(true)
@master_worker.expects(:reactor).returns(t_reactor)
- @master_worker.receive_data(dump_object(f))
+ @master_worker.receive_data(packet_dump(f))
end
specify "should route worker result requests" do
c = {:type=>:get_result, :worker=>:foo_worker, :job_key=>:start_message}
- @master_worker.receive_data(dump_object(c))
+ @master_worker.receive_data(packet_dump(c))
@master_worker.outgoing_data.should == {:type=>:get_result, :data=>{:job_key=>:start_message}, :result=>true}
end
@@ -135,13 +145,13 @@ def method_missing method_id,*args; "boy"; end
live_workers.expects(:delete).returns(true)
t_reactor.expects(:live_workers).returns(live_workers)
@master_worker.expects(:reactor).returns(t_reactor)
- @master_worker.receive_data(dump_object(c))
+ @master_worker.receive_data(packet_dump(c))
end
specify "should ignore generic exceptions while fetching results" do
c = {:type=>:get_result, :worker=>:foo_worker, :job_key=>:start_message}
@master_worker.ask_for_exception(:generic)
- @master_worker.receive_data(dump_object(c))
+ @master_worker.receive_data(packet_dump(c))
@master_worker.outgoing_data.should == nil
end
end
View
30 test/server/test_meta_worker.rb
@@ -3,11 +3,12 @@
require "chronic"
context "A Meta Worker should" do
- def dump_object data
- t = Marshal.dump(data)
- t.length.to_s.rjust(9,'0') + t
+ module Kernel
+ def packet_dump data
+ t = Marshal.dump(data)
+ t.length.to_s.rjust(9,'0') + t
+ end
end
-
setup do
options = {:schedules =>
{
@@ -64,7 +65,7 @@ def ivar(var)
specify "should invoke async tasks without sending results" do
a = {:data=>{:worker_method=>"who", :arg=>"rails",:job_key => "lol"}, :type=>:request, :result=>false, :client_signature=>9}
@meta_worker.expects(:who).with("rails").returns(nil)
- @meta_worker.receive_internal_data(dump_object(a))
+ @meta_worker.receive_internal_data(packet_dump(a))
Thread.current[:job_key].should == "lol"
end
@@ -75,8 +76,8 @@ def baz args
end
end
b = {:data=>{:worker_method=>"baz", :arg=>"rails"}, :type=>:request, :result=>true, :client_signature=>9}
- @meta_worker.expects(:send_data).with({:data=>"hello : rails", :type=>:response, :result=>true, :client_signature=>9}).returns("hello : rails")
- @meta_worker.receive_internal_data(dump_object(b))
+ @meta_worker.expects(:send_data).with({:data=>"hello : rails", :type=>:response, :result=>true, :client_signature=>9, :result_flag => "ok"}).returns("hello : rails")
+ @meta_worker.receive_internal_data(packet_dump(b))
Thread.current[:job_key].should == nil
end
@@ -90,7 +91,7 @@ def send_data data
b = {:data=> {:worker_method=>"baz", :arg => { :name => "bdrb",:age => 10} }, :type=>:request, :result=>true, :client_signature=>9 }
# @meta_worker.expects(:send_data).with({:data=>"hello : rails", :type=>:response, :result=>true, :client_signature=>9}).returns("hello : rails")
@meta_worker.expects(:baz).with({ :name => "bdrb",:age => 10}).returns("foo")
- @meta_worker.receive_internal_data(dump_object(b))
+ @meta_worker.receive_internal_data(packet_dump(b))
@meta_worker.outgoing_data[:data].should == "foo"
Thread.current[:job_key].should == nil
end
@@ -104,7 +105,7 @@ def send_data data
end
@meta_worker.cache[:start_message] = "helloworld"
c = {:data=>{:job_key=>:start_message}, :type=>:get_result, :result=>true, :client_signature=>9}
- @meta_worker.receive_internal_data(dump_object(c))
+ @meta_worker.receive_internal_data(packet_dump(c))
@meta_worker.t_result[:data].should == "helloworld"
end
@@ -114,15 +115,14 @@ def baz args
proc { "boy"}
end
def send_data input
- dump_object(input)
+ packet_dump(input)
end
end
b = {:data=>{:worker_method=>"baz", :arg=>"rails"}, :type=>:request, :result=>true, :client_signature=>9}
- a = @meta_worker.receive_internal_data(dump_object(b))
- a.should == nil
+ a = @meta_worker.receive_internal_data(packet_dump(b))
+ p a
Thread.current[:job_key].should == nil
end
-
end
context "For unix schedulers" do
@@ -260,7 +260,7 @@ def ivar(var)
specify "should run enqueued tasks with arguments if they are there in the queue" do
@meta_worker = QueueWorker.start_worker
mocked_task = mock()
- mocked_task.expects(:worker_method).returns(:barbar).times(3)
+ mocked_task.expects(:worker_method).returns(:barbar).times(2)
mocked_task.expects(:args).returns(Marshal.dump("hello"))
mocked_task.expects(:[]).returns(1).times(2)
@meta_worker.expects(:barbar).with("hello").returns(true)
@@ -272,7 +272,7 @@ def ivar(var)
@meta_worker = QueueWorker.start_worker
mocked_task = mock()
mocked_task.expects(:[]).returns(1).times(2)
- mocked_task.expects(:worker_method).returns(:barbar).times(3)
+ mocked_task.expects(:worker_method).returns(:barbar).times(2)
mocked_task.expects(:args).returns(nil)
@meta_worker.expects(:barbar)
BdrbJobQueue.expects(:find_next).with("queue_worker").returns(mocked_task)
Please sign in to comment.
Something went wrong with that request. Please try again.