Permalink
Browse files

check in new thread pool changes and per worker configuration

  • Loading branch information...
1 parent ee14450 commit 89f70dbdd43c22cff227ffb1e84f1686a6bd4c72 @gnufied committed Mar 28, 2009
@@ -43,14 +43,34 @@ def start
op.write(Process.pid().to_s)
op.close
if BDRB_CONFIG[:backgroundrb][:log].nil? or BDRB_CONFIG[:backgroundrb][:log] != 'foreground'
- log_file = File.open(SERVER_LOGGER,(File::WRONLY | File::APPEND | File::CREAT))
- log_file.sync = true
- [STDOUT, STDERR].each {|desc| desc.reopen(log_file)}
+ redirect_io(SERVER_LOGGER)
end
BackgrounDRb::MasterProxy.new()
end
end
+ # Free file descriptors and
+ # point them somewhere sensible
+ # STDOUT/STDERR should go to a logfile
+ def redirect_io(logfile_name)
+ begin; STDIN.reopen "/dev/null"; rescue ::Exception; end
+
+ if logfile_name
+ begin
+ STDOUT.reopen logfile_name, "a"
+ STDOUT.sync = true
+ rescue ::Exception
+ begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end
+ end
+ else
+ begin; STDOUT.reopen "/dev/null"; rescue ::Exception; end
+ end
+
+ begin; STDERR.reopen STDOUT; rescue ::Exception; end
+ STDERR.sync = true
+ end
+
+
def stop
pid_files = Dir["#{RAILS_HOME}/tmp/pids/backgroundrb_*.pid"]
pid_files.each { |x| kill_process(x) }
@@ -16,6 +16,10 @@ def method_missing(method_id,*args)
arguments = args.first
arg,job_key,host_info,scheduled_at = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at)
+
+ # allow both arg and args
+ arg ||= arguments[:args]
+
new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc
if worker_method =~ /^async_(\w+)/
@@ -76,7 +76,8 @@ def add_thread
block_result = run_task(task)
end
rescue BackgrounDRb::InterruptedException
- logger.info("BackgrounDRb thread interrupted: #{Thread.current.inspect}")
+ STDERR.puts("BackgrounDRb thread interrupted: #{Thread.current.inspect}")
+ STDERR.flush
end
end
end
@@ -86,7 +87,7 @@ def add_thread
def run_task task
block_arity = master.method(task.job_method).arity
begin
- ActiveRecord::Base.verify_active_connections!
+ check_db_connection
t_data = task.args
result = nil
if block_arity != 0
@@ -98,12 +99,29 @@ def run_task task
rescue BackgrounDRb::InterruptedException => e
# Don't log, just re-raise
raise e
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
return nil
end
end
+
+ def log_exception exception_object
+ STDERR.puts exception_object.to_s
+ STDERR.puts exception_object.backtrace.join("\n")
+ STDERR.flush
+ end
+
+
+ # Periodic check for lost database connections and closed connections
+ def check_db_connection
+ begin
+ ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
+ end
+ end
+
+
end #end of class ThreadPool
end # end of module BackgrounDRb
View
@@ -108,59 +108,71 @@ def worker_init
raise "Invalid worker name" if !worker_name
Thread.abort_on_exception = true
- log_flag = BDRB_CONFIG[:backgroundrb][:debug_log].nil? ? true : BDRB_CONFIG[:backgroundrb][:debug_load_rails_env]
-
# stores the job key of currently running job
Thread.current[:job_key] = nil
+ initialize_logger
+
+ @thread_pool = ThreadPool.new(self,pool_size || 20,@logger)
+ t_worker_key = worker_options && worker_options[:worker_key]
+
+ @cache = ResultStorage.new(worker_name,t_worker_key,BDRB_CONFIG[:backgroundrb][:result_storage])
+
+ if(worker_options && worker_options[:schedule] && no_auto_load)
+ load_schedule_from_args
+ elsif(BDRB_CONFIG[:schedules] && BDRB_CONFIG[:schedules][worker_name.to_sym])
+ @my_schedule = BDRB_CONFIG[:schedules][worker_name.to_sym]
+ new_load_schedule if @my_schedule
+ end
+ if respond_to?(:create)
+ invoke_user_method(:create,worker_options[:data])
+ end
+ if run_persistent_jobs?
+ add_periodic_timer(persistent_delay.to_i) { check_for_enqueued_tasks }
+ end
+ end
+
+ def initialize_logger
+ log_flag = BDRB_CONFIG[:backgroundrb][:debug_log].nil? ? true : BDRB_CONFIG[:backgroundrb][:debug_load_rails_env]
if BDRB_CONFIG[:backgroundrb][:logging_logger].nil?
@logger = PacketLogger.new(self,log_flag)
else
log_config = BDRB_CONFIG[:backgroundrb][:logging_logger]
@logger = Logging::Logger[log_config[:name]]
@logger.trace = log_config[:trace]
@logger.additive = log_config[:additive]
+
log_config[:appenders].keys.each do |key|
appender_config = log_config[:appenders][key]
- appender = "Logging::Appenders::#{appender_config[:type]}".constantize.new("backgroundrb_#{key}",
+
+ logger_options = {
:filename => "#{RAILS_HOME}/#{appender_config[:filename]}",
:age => appender_config[:age],
:size => appender_config[:size],
:keep => appender_config[:keep],
:safe => appender_config[:safe],
- :layout => Logging::Layouts::Pattern.new(:pattern => appender_config[:layout_pattern]))
+ :layout => Logging::Layouts::Pattern.new(:pattern => appender_config[:layout_pattern])
+ }
+ appender = "Logging::Appenders::#{appender_config[:type]}".constantize.new("backgroundrb_#{key}",logger_options)
@logger.add_appenders(appender)
end
end
- @thread_pool = ThreadPool.new(self,pool_size || 20,@logger)
- t_worker_key = worker_options && worker_options[:worker_key]
-
- @cache = ResultStorage.new(worker_name,t_worker_key,BDRB_CONFIG[:backgroundrb][:result_storage])
+ end
- if(worker_options && worker_options[:schedule] && no_auto_load)
- load_schedule_from_args
- elsif(BDRB_CONFIG[:schedules] && BDRB_CONFIG[:schedules][worker_name.to_sym])
- @my_schedule = BDRB_CONFIG[:schedules][worker_name.to_sym]
- new_load_schedule if @my_schedule
- end
- if respond_to?(:create)
- create_arity = method(:create).arity
- (create_arity == 0) ? create : create(worker_options[:data])
- end
- if run_persistent_jobs?
- add_periodic_timer(persistent_delay.to_i) { check_for_enqueued_tasks }
- end
+ def puts msg
+ STDOUT.puts msg
+ STDOUT.flush
end
# Returns the persistent job queue check delay for this worker
def persistent_delay
get_config_value(:persistent_delay, 5)
end
-
+
# Returns true if persistent jobs should be run for this worker.
def run_persistent_jobs?
!get_config_value(:persistent_disabled, false)
end
-
+
# return job key from thread global variable
def job_key; Thread.current[:job_key]; end
@@ -212,25 +224,16 @@ def process_request(p_data)
user_input = p_data[:data]
if (user_input[:worker_method]).nil? or !respond_to?(user_input[:worker_method])
result = nil
+ puts "Trying to invoke invalid worker method on worker #{worker_name}"
send_response(p_data,result)
return
end
- called_method_arity = self.method(user_input[:worker_method]).arity
result = nil
Thread.current[:job_key] = user_input[:job_key]
- begin
- if called_method_arity != 0
- result = self.send(user_input[:worker_method],user_input[:arg])
- else
- result = self.send(user_input[:worker_method])
- end
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
- end
+ result = invoke_user_method(user_input[:worker_method],user_input[:arg])
if p_data[:result]
result = "dummy_result" if result.nil?
@@ -273,19 +276,40 @@ def send_response input,output
input[:type] = :response
begin
send_data(input)
- rescue TypeError => e
- logger.info(e.to_s)
- logger.info(e.backtrace.join("\n"))
- input[:data] = "invalid_result_dump_check_log"
- send_data(input)
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
input[:data] = "invalid_result_dump_check_log"
send_data(input)
end
end
+ def log_exception exception_object
+ STDERR.puts exception_object.to_s
+ STDERR.puts exception_object.backtrace.join("\n")
+ STDERR.flush
+ end
+
+ def invoke_user_method user_method,args
+ if self.respond_to?(user_method)
+ called_method_arity = self.method(user_method).arity
+ t_result = nil
+ begin
+ if(called_method_arity != 0)
+ t_result = self.send(user_method,args)
+ else
+ t_result = self.send(user_method)
+ end
+ rescue Object => bdrb_error
+ puts "Error calling method #{user_method} with #{args} on worker #{worker_name}"
+ log_exception(bdrb_error)
+ end
+ t_result
+ else
+ puts "Trying to invoke method #{user_method} with #{args} on worker #{worker_name} failed because no such method is defined on the worker"
+ nil
+ end
+ end
+
# called when connection is closed
def unbind; end
@@ -297,18 +321,8 @@ def check_for_enqueued_tasks
if self.respond_to? task.worker_method
Thread.current[:persistent_job_id] = task[:id]
Thread.current[:job_key] = task[:job_key]
- called_method_arity = self.method(task.worker_method).arity
args = load_data(task.args)
- begin
- if called_method_arity != 0
- self.send(task.worker_method,args)
- else
- self.send(task.worker_method)
- end
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
- end
+ invoke_user_method(task.worker_method,task.args)
else
task.release_job
end
@@ -337,12 +351,7 @@ def check_for_timer_events
time_now = Time.now.to_i
if value[:runtime] < time_now
check_db_connection
- begin
- (t_data = value[:data]) ? send(key,t_data) : send(key)
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
- end
+ invoke_user_method(key,value[:data])
t_time = value[:trigger].fire_after_time(Time.now)
value[:runtime] = t_time.to_i
end
@@ -353,9 +362,8 @@ def check_for_timer_events
def check_db_connection
begin
ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
- rescue
- logger.info($!.to_s)
- logger.info($!.backtrace.join("\n"))
+ rescue Object => bdrb_error
+ log_exception(bdrb_error)
end
end
@@ -370,7 +378,7 @@ def worker_config
{}
end
end
-
+
# Returns the appropriate configuration value, based on both the
# global config and the per-worker configuration for this worker.
def get_config_value(key_sym, default)
@@ -382,7 +390,7 @@ def get_config_value(key_sym, default)
default
end
end
-
+
def load_rails_env
db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
run_env = ENV["RAILS_ENV"]

0 comments on commit 89f70db

Please sign in to comment.