From d211e3ec2d868b867c83ef57ea3ef567c1c8c5ac Mon Sep 17 00:00:00 2001 From: gnufied Date: Sat, 4 Oct 2008 03:10:54 +0530 Subject: [PATCH] improve start stop managment --- ChangeLog | 4 +++ Rakefile | 10 ++++-- lib/backgroundrb.rb | 2 +- lib/backgroundrb/bdrb_start_stop.rb | 41 ++++++++++++++++++++++++ lib/backgroundrb/rails_worker_proxy.rb | 22 +++++++++++++ script/backgroundrb | 43 ++++---------------------- 6 files changed, 82 insertions(+), 40 deletions(-) create mode 100644 lib/backgroundrb/bdrb_start_stop.rb diff --git a/ChangeLog b/ChangeLog index 47b94cd..eb5c720 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,9 @@ 2008-09-07 hemant kumar + * Fix environment loading issues. + + * Patch by Kieran P for disabling persistent job queue and its polling. + * Commit patch by P Baker, related to scheduling a persistent task at specified time. For example: MiddleMan(:hello_worker).enq_some_task(:arg => "hello_world", :job_key => "boy",:scheduled_at => (Time.now + 1.hour)) diff --git a/Rakefile b/Rakefile index 29c30ca..38c8f2b 100644 --- a/Rakefile +++ b/Rakefile @@ -4,6 +4,7 @@ require 'rake/testtask' require 'rake/rdoctask' require 'spec/rake/spectask' require 'rake/contrib/sshpublisher' +require "darkfish-rdoc" desc 'Default: run unit tests.' task :default => :test @@ -31,14 +32,19 @@ desc 'Generate documentation for the backgroundrb plugin.' Rake::RDocTask.new(:rdoc) do |rdoc| rdoc.rdoc_dir = 'doc/output/manual' rdoc.title = 'Backgroundrb' - rdoc.options << '--line-numbers' << '--inline-source' + #rdoc.options << '--line-numbers' << '--inline-source' rdoc.rdoc_files.include('README') rdoc.rdoc_files.include('LICENSE') rdoc.rdoc_files.include('lib/*.rb') rdoc.rdoc_files.include('lib/backgroundrb/*.rb') rdoc.rdoc_files.include('server/*.rb') rdoc.rdoc_files.include('server/lib/*.rb') - rdoc.template = 'jamis' + #rdoc.template = 'jamis' + rdoc.options += [ + '-SHN', + '-f', 'darkfish', # This is the important bit + ] + end module Rake diff --git a/lib/backgroundrb.rb b/lib/backgroundrb.rb index 4ad7ea5..fa9acb2 100644 --- a/lib/backgroundrb.rb +++ b/lib/backgroundrb.rb @@ -16,7 +16,7 @@ require "backgroundrb/rails_worker_proxy" require "backgroundrb/bdrb_connection" require "backgroundrb/bdrb_cluster_connection" - +require "backgroundrb/bdrb_start_stop" MiddleMan = BackgrounDRb::ClusterConnection.new diff --git a/lib/backgroundrb/bdrb_start_stop.rb b/lib/backgroundrb/bdrb_start_stop.rb new file mode 100644 index 0000000..7293bae --- /dev/null +++ b/lib/backgroundrb/bdrb_start_stop.rb @@ -0,0 +1,41 @@ +module BackgrounDRb + class StartStop + def kill_process arg_pid_file + pid = nil + pid = File.open(arg_pid_file, "r") { |pid_handle| pid_handle.gets.strip.chomp.to_i } + pgid = Process.getpgid(pid) + puts "Stopping BackgrounDRb with pid #{pid}...." + Process.kill('-TERM', pgid) + File.delete(arg_pid_file) if File.exists?(arg_pid_file) + puts "Success!" + end + def running?; File.exists?(PID_FILE); end + + def start + if fork + sleep(5) + exit(0) + else + if running? + puts "pid file already exists, exiting..." + exit(-1) + end + puts "Starting BackgrounDRb .... " + op = File.open(PID_FILE, "w") + op.write(Process.pid().to_s) + op.close + if BDRB_CONFIG[:backgroundrb][:log].nil? or BDRB_CONFIG[:backgroundrb][:log] != 'foreground' + log_file = File.open(SERVER_LOGGER,"w+") + [STDIN, STDOUT, STDERR].each {|desc| desc.reopen(log_file)} + end + + BackgrounDRb::MasterProxy.new() + end + end + + def stop + pid_files = Dir["#{RAILS_HOME}/tmp/pids/backgroundrb_*.pid"] + pid_files.each { |x| kill_process(x) } + end + end +end diff --git a/lib/backgroundrb/rails_worker_proxy.rb b/lib/backgroundrb/rails_worker_proxy.rb index 0205f07..0702468 100644 --- a/lib/backgroundrb/rails_worker_proxy.rb +++ b/lib/backgroundrb/rails_worker_proxy.rb @@ -1,7 +1,9 @@ module BackgrounDRb + # A Worker proxy, which uses +method_missing+ for delegating method calls to the workers class RailsWorkerProxy attr_accessor :worker_name, :worker_method, :data, :worker_key,:middle_man + # create new worker proxy def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil) @worker_name = p_worker_name @middle_man = p_middle_man @@ -40,14 +42,17 @@ def method_missing(method_id,*args) end end + # enqueue tasks to the worker pool def enqueue_task options = {} BdrbJobQueue.insert_job(options) end + # remove tasks from the worker pool def dequeue_task options = {} BdrbJobQueue.remove_job(options) end + # invoke method on worker def run_method host_info,method_name,worker_options = {} result = [] connection = choose_connection(host_info) @@ -75,11 +80,14 @@ def run_method host_info,method_name,worker_options = {} return_result(result) end + # choose a backgroundrb server connection and invoke worker method on it. def invoke_on_connection connection,method_name,options = {} raise NoServerAvailable.new("No BackgrounDRb is found running") unless connection connection.send(method_name,options) end + # get results back from the cache. Cache can be in-memory worker cache or memcache + # based cache def ask_result job_key options = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key) if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' @@ -90,26 +98,38 @@ def ask_result job_key end end + # return runtime information about worker def worker_info t_connections = middle_man.backend_connections result = t_connections.map { |conn| conn.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) } return_result(result) end + # generate worker key def gen_key options key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_') key end + # return result from memcache def return_result_from_memcache options = {} middle_man.cache[gen_key(options)] end + # reset result within memcache for given key + def reset_memcache_result(job_key,value) + options = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key) + key = gen_key(options) + middle_man.cache[key] = value + value + end + def return_result result result = Array(result) result.size <= 1 ? result[0] : result end + # delete a worker def delete middle_man.backend_connections.each do |connection| connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key)) @@ -117,6 +137,7 @@ def delete return worker_key end + # choose a worker def choose_connection host_info case host_info when :all; middle_man.backend_connections @@ -126,6 +147,7 @@ def choose_connection host_info end end + # helper method to compact a hash and for getting rid of nil parameters def compact(options = { }) options.delete_if { |key,value| value.nil? } options diff --git a/script/backgroundrb b/script/backgroundrb index dabf80b..9be048a 100755 --- a/script/backgroundrb +++ b/script/backgroundrb @@ -26,45 +26,14 @@ require RAILS_HOME + "/config/environment" require "bdrb_job_queue" require "backgroundrb_server" -pid_file = "#{RAILS_HOME}/tmp/pids/backgroundrb_#{BDRB_CONFIG[:backgroundrb][:port]}.pid" +PID_FILE = "#{RAILS_HOME}/tmp/pids/backgroundrb_#{BDRB_CONFIG[:backgroundrb][:port]}.pid" SERVER_LOGGER = "#{RAILS_HOME}/log/backgroundrb_debug_#{BDRB_CONFIG[:backgroundrb][:port]}.log" -case ARGV[0] -when 'start' - if fork - sleep(5) - exit - else - op = File.open(pid_file, "w") - op.write(Process.pid().to_s) - op.close - if BDRB_CONFIG[:backgroundrb][:log].nil? or BDRB_CONFIG[:backgroundrb][:log] != 'foreground' - log_file = File.open(SERVER_LOGGER,"w+") - [STDIN, STDOUT, STDERR].each {|desc| desc.reopen(log_file)} - end +daemon = BackgrounDRb::StartStop.new - BackgrounDRb::MasterProxy.new() - end -when 'stop' - def kill_process arg_pid_file - pid = nil - File.open(arg_pid_file, "r") { |pid_handle| pid = pid_handle.gets.strip.chomp.to_i } - begin - pgid = Process.getpgid(pid) - Process.kill('TERM', pid) - Process.kill('-TERM', pgid) - Process.kill('KILL', pid) - rescue Errno::ESRCH => e - puts "Deleting pid file" - rescue - puts $! - ensure - File.delete(arg_pid_file) if File.exists?(arg_pid_file) - end - end - pid_files = Dir["#{RAILS_HOME}/tmp/pids/backgroundrb_*.pid"] - pid_files.each { |x| kill_process(x) } -else - BackgrounDRb::MasterProxy.new() +case ARGV[0] +when 'start'; daemon.start +when 'stop'; daemon.stop() +else; BackgrounDRb::MasterProxy.new() end