Permalink
Browse files

get result objects correctly

  • Loading branch information...
gnufied committed Jun 20, 2008
1 parent d73d859 commit 336bd72564a3ab3934dd4876896024b1c7f799c6
View
@@ -5,13 +5,14 @@
require "ostruct"
BACKGROUNDRB_ROOT = Pathname.new(RAILS_ROOT).realpath.to_s
+require "backgroundrb/bdrb_config"
BDRB_CONFIG = BackgrounDRb::Config.read_config("#{BACKGROUNDRB_ROOT}/config/backgroundrb.yml")
require "backgroundrb/bdrb_conn_error"
-require "backgroundrb/bdrb_config"
require "backgroundrb/rails_worker_proxy"
require "backgroundrb/bdrb_connection"
require "backgroundrb/bdrb_cluster_connection"
+
MiddleMan = BackgrounDRb::ClusterConnection.new
@@ -6,12 +6,23 @@ class ClusterConnection
def initialize
@bdrb_servers = []
@backend_connections = []
+ initialize_memcache if BDRB_CONFIG[:backgroundrb][:result_storage] == :memcache
establish_connections
@round_robin = (0...@backend_connections.length).to_a
end
def initialize_memcache
-
+ require 'memcache'
+ memcache_options = {
+ :c_threshold => 10_000,
+ :compression => true,
+ :debug => false,
+ :namespace => 'backgroundrb_result_hash',
+ :readonly => false,
+ :urlencode => false
+ }
+ @cache = MemCache.new(memcache_options)
+ @cache.servers = BDRB_CONFIG[:memcache].split(',')
end
# initialize all backend server connections
@@ -27,7 +38,7 @@ def establish_connections
@bdrb_servers << OpenStruct.new(:ip => BDRB_CONFIG[:backgroundrb][:ip],:port => BDRB_CONFIG[:backgroundrb][:port].to_i)
end
@bdrb_servers.each_with_index do |connection_info,index|
- @backend_connections << Connection.custom_connection(connection_info.ip,connection_info.port)
+ @backend_connections << Connection.new(connection_info.ip,connection_info.port,self)
end
end # end of method establish_connections
@@ -1,20 +1,13 @@
module BackgrounDRb
class Connection
- def self.server_ip; @server_ip; end
- def self.server_port; @server_port; end
-
- def server_ip; self.class.server_ip; end
- def server_port; self.class.server_port; end
-
- def self.custom_connection(ip,port)
+ attr_accessor :server_ip,:server_port,:cluster_conn
+ def initialize ip,port,cluster_conn
+ @mutex = Mutex.new
@server_ip = ip
@server_port = port
- new
+ @cluster_conn = cluster_conn
end
- def initialize
- @mutex = Mutex.new
- end
def worker(worker_name,worker_key = nil)
RailsWorkerProxy.worker(worker_name,worker_key,self)
@@ -136,28 +129,28 @@ def read_object
end
def gen_key options
- if BDRB_CONFIG[:backgroundr][:result_storage] == :memcache
- [options[:worker],options[:worker_key]worker_key,options[:job_key]].compact.join('_')
+ if BDRB_CONFIG[:backgroundrb][:result_storage] == :memcache
+ [options[:worker],options[:worker_key],options[:job_key]].compact.join('_')
else
options[:job_key]
end
end
def ask_result(p_data)
- if BDRB_CONFIG[:backgroundr][:result_storage] == :memcache
+ if BDRB_CONFIG[:backgroundrb][:result_storage] == :memcache
return_result_from_memcache(p_data)
else
p_data[:type] = :get_result
dump_object(p_data)
bdrb_response = nil
@mutex.synchronize { bdrb_response = read_from_bdrb() }
close_connection
- bdrb_response
+ return bdrb_response[:data]
end
end
def return_result_from_memcache options = {}
-
+ cluster_conn.cache[gen_key(options)]
end
def read_from_bdrb(timeout = 3)
@@ -16,7 +16,7 @@ def method_missing(method_id,*args)
flag = args[1]
case worker_method
when :ask_result
- middle_man.ask_status(compact(:worker => worker_name,:worker_key => worker_key))
+ middle_man.ask_result(compact(:worker => worker_name,:worker_key => worker_key,:job_key => data))
when :worker_info
middle_man.worker_info(compact(:worker => worker_name,:worker_key => worker_key))
when :delete
@@ -1,4 +1,5 @@
require "chronic"
+require "lib/bdrb_result_storage"
require "lib/bdrb_thread_pool"
require "lib/bdrb_server_helper"
require "lib/master_worker"
@@ -1,6 +1,6 @@
module BackgrounDRb
class ResultStorage
- attr_accessor :cache,:worker_name,:worker_key
+ attr_accessor :cache,:worker_name,:worker_key,:storage_type
def initialize(worker_name,worker_key,storage_type = nil)
@worker_name = worker_name
@worker_key = worker_key
@@ -98,7 +98,8 @@ def run_task task
block_arity = task.block.arity
begin
ActiveRecord::Base.verify_active_connections!
- data = (task.data.is_a?(Array)) ? *(task.data) : task.data
+ #data = (task.data.is_a?(Array)) ? *(task.data) : task.data
+ data = task.data
result = (block_arity == 0 ? task.block.call : task.block.call(data))
return result
rescue
@@ -106,8 +106,15 @@ def get_result_object(t_data)
worker_name_key = gen_worker_key(worker_name,t_data[:worker_key])
t_data.delete(:worker)
t_data.delete(:type)
-
- send_object(result_data)
+ begin
+ ask_worker(worker_name_key,:data => t_data, :type => :get_result,:result => true)
+ rescue Packet::DisconnectError => sock_error
+ reactor.live_workers.delete(worker_name_key)
+ rescue
+ debug_logger.info($!.to_s)
+ debug_logger.info($!.backtrace.join("\n"))
+ return
+ end
end
def method_invoke(t_data)
@@ -86,7 +86,7 @@ def error(p_data)
class MetaWorker < Packet::Worker
include BackgrounDRb::BdrbServerHelper
attr_accessor :config_file, :my_schedule, :run_time, :trigger_type, :trigger
- attr_accessor :logger, :thread_pool
+ attr_accessor :logger, :thread_pool,:cache
iattr_accessor :pool_size
iattr_accessor :reload_flag
View
@@ -25,6 +25,18 @@
cache[job_key] =>
will return the result. worker_name, worker_key shall be taken in to account.
+** Write a Rails controller that returns following information:
+ - For each BackgrounDRb server:
+ - Workers running
+ - Memory taken by them
+ - Number of threads in thread pool
+ - Number of Tasks in Queue
+ - If any worker key
+ - Perhaps a count of open file descriptors
+ - Scheduled tasks and next turn
+ - Overall memory taken by entire BackgrounDRb server.
+
+

0 comments on commit 336bd72

Please sign in to comment.