Permalink
Browse files

check new refactored code master proxy

  • Loading branch information...
1 parent 25f0426 commit c30ffca3986ca40a45cad3176f7cd66e1cc45ff4 @gnufied committed Jun 20, 2008
Showing with 128 additions and 2 deletions.
  1. +1 −0 server/backgroundrb_server.rb
  2. +123 −0 server/lib/master_proxy.rb
  3. +4 −1 server/lib/master_worker.rb
  4. +0 −1 test/server/test_master_worker.rb
@@ -3,6 +3,7 @@
require "lib/bdrb_thread_pool"
require "lib/bdrb_server_helper"
require "lib/master_worker"
+require "lib/master_proxy"
require "lib/cron_trigger"
require "lib/invalid_dump_error"
require "lib/log_worker"
@@ -0,0 +1,123 @@
+module BackgrounDRb
+ class MasterProxy
+ attr_accessor :reloadable_workers,:worker_triggers,:reactor
+ def initialize
+ raise "Running old Ruby version, upgrade to Ruby >= 1.8.5" unless check_for_ruby_version
+
+ log_flag = BDRB_CONFIG[:backgroundrb][:debug_log].nil? ? true : BDRB_CONFIG[:backgroundrb][:debug_log]
+ debug_logger = DebugMaster.new(BDRB_CONFIG[:backgroundrb][:log],log_flag)
+
+ load_rails_env
+
+ find_reloadable_worker
+
+ Packet::Reactor.run do |t_reactor|
+ @reactor = t_reactor
+ t_reactor.start_worker(:worker => :log_worker) if log_flag
+ t_reactor.start_server(BDRB_CONFIG[:backgroundrb][:ip],
+ BDRB_CONFIG[:backgroundrb][:port],MasterWorker) do |conn|
+ conn.debug_logger = debug_logger
+ end
+ t_reactor.next_turn { reload_workers }
+ end
+ end
+
+ def gen_worker_key(worker_name,worker_key = nil)
+ return worker_name if worker_key.nil?
+ return "#{worker_name}_#{worker_key}".to_sym
+ end
+
+
+ # method should find reloadable workers and load their schedule from config file
+ def find_reloadable_worker
+ t_workers = Dir["#{WORKER_ROOT}/**/*.rb"]
+ @reloadable_workers = t_workers.map do |x|
+ worker_name = File.basename(x,".rb")
+ require worker_name
+ worker_klass = Object.const_get(worker_name.classify)
+ worker_klass.reload_flag ? worker_klass : nil
+ end.compact
+ @worker_triggers = { }
+ @reloadable_workers.each do |t_worker|
+ schedule = load_reloadable_schedule(t_worker)
+ if schedule && !schedule.empty?
+ @worker_triggers[t_worker.worker_name.to_sym] = schedule
+ end
+ end
+ end
+
+ def load_reloadable_schedule(t_worker)
+ worker_method_triggers = { }
+ worker_schedule = BDRB_CONFIG[:schedules][t_worker.worker_name.to_sym]
+
+ worker_schedule && worker_schedule.each do |key,value|
+ case value[:trigger_args]
+ when String
+ cron_args = value[:trigger_args] || "0 0 0 0 0"
+ trigger = BackgrounDRb::CronTrigger.new(cron_args)
+ when Hash
+ trigger = BackgrounDRb::Trigger.new(value[:trigger_args])
+ end
+ worker_method_triggers[key] = {
+ :trigger => trigger,:data => value[:data],
+ :runtime => trigger.fire_after_time(Time.now).to_i
+ }
+ end
+ worker_method_triggers
+ end
+
+ # method will reload workers that should be loaded on each schedule
+ def reload_workers
+ return if worker_triggers.empty?
+ worker_triggers.each do |key,value|
+ value.delete_if { |key,value| value[:trigger].respond_to?(:end_time) && value[:trigger].end_time <= Time.now }
+ end
+
+ worker_triggers.each do |worker_name,trigger|
+ trigger.each do |key,value|
+ time_now = Time.now.to_i
+ if value[:runtime] < time_now
+ load_and_invoke(worker_name,key,value)
+ t_time = value[:trigger].fire_after_time(Time.now)
+ value[:runtime] = t_time.to_i
+ end
+ end
+ end
+ end
+
+ # method will load the worker and invoke worker method
+ def load_and_invoke(worker_name,p_method,data)
+ begin
+ require worker_name.to_s
+ worker_key = Packet::Guid.hexdigest
+ @reactor.start_worker(:worker => worker_name,:worker_key => worker_key)
+ worker_name_key = gen_worker_key(worker_name,worker_key)
+ data_request = {:data => { :worker_method => p_method,:data => data[:data]},
+ :type => :request, :result => false
+ }
+
+ exit_request = {:data => { :worker_method => :exit},
+ :type => :request, :result => false
+ }
+
+ @reactor.live_workers[worker_name_key].send_request(data_request)
+ @reactor.live_workers[worker_name_key].send_request(exit_request)
+ rescue LoadError
+ puts "no such worker #{worker_name}"
+ rescue MissingSourceFile
+ puts "no such worker #{worker_name}"
+ return
+ end
+ end
+
+ def load_rails_env
+ db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
+ run_env = ENV["RAILS_ENV"]
+ ActiveRecord::Base.establish_connection(db_config_file[run_env])
+ ActiveRecord::Base.allow_concurrency = true
+ end
+
+ def check_for_ruby_version; RUBY_VERSION >= "1.8.5"; end
+ end # end of module BackgrounDRb
+end
+
@@ -204,7 +204,10 @@ def load_reloadable_schedule(t_worker)
when Hash
trigger = BackgrounDRb::Trigger.new(value[:trigger_args])
end
- worker_method_triggers[key] = { :trigger => trigger,:data => value[:data],:runtime => trigger.fire_after_time(Time.now).to_i }
+ worker_method_triggers[key] = {
+ :trigger => trigger,:data => value[:data],
+ :runtime => trigger.fire_after_time(Time.now).to_i
+ }
end
worker_method_triggers
end
@@ -26,7 +26,6 @@
end
specify "load schedule should load schedule of worker specified" do
- #require "bar_worker"
@master_proxy.load_reloadable_schedule(BarWorker).should.not.be { }
end

0 comments on commit c30ffca

Please sign in to comment.