Permalink
Browse files

Merge commit 'ezmobius/master'

  • Loading branch information...
2 parents 75729d5 + 491ae82 commit 4074f9c70007bedc4ab9ee617b325861ca39e8a9 Chris Gaffney committed Aug 8, 2009
View
@@ -10,6 +10,7 @@ vendor
rdoc
config.yml
spec/config.yml
+spec/nanite.*.log
*.tmproj
nbproject
.idea
View
@@ -37,6 +37,11 @@ opts = OptionParser.new do |opts|
opts.on("--single-threaded", "Run all operations in one thread") do
options[:single_threaded] = true
end
+
+ opts.on("--threadpool COUNT", "Number of threads to run all operations in") do |tps|
+ options[:threadpool_size] = tps
+ end
+
end
opts.parse!
@@ -8,7 +8,9 @@ def initialize
def register(actor, prefix)
raise ArgumentError, "#{actor.inspect} is not a Nanite::Actor subclass instance" unless Nanite::Actor === actor
- Nanite::Log.info("Registering #{actor.inspect} with prefix #{prefix.inspect}")
+ log_msg = "[actor] #{actor.class.to_s}"
+ log_msg += ", prefix #{prefix}" if prefix && !prefix.empty?
+ Nanite::Log.info(log_msg)
prefix ||= actor.class.default_prefix
actors[prefix.to_s] = actor
end
View
@@ -57,6 +57,8 @@ class Agent
#
# single_threaded: Run all operations in one thread
#
+ # threadpool_size: Number of threads to run operations in
+ #
# Connection options:
#
# vhost : AMQP broker vhost that should be used
@@ -161,33 +163,35 @@ def load_actors
actors = @options[:actors]
Dir["#{actors_dir}/*.rb"].each do |actor|
next if actors && !actors.include?(File.basename(actor, ".rb"))
- Nanite::Log.info("loading actor: #{actor}")
+ Nanite::Log.info("[setup] loading #{actor}")
require actor
end
init_path = @options[:initrb] || File.join(options[:root], 'init.rb')
instance_eval(File.read(init_path), init_path) if File.exist?(init_path)
end
def receive(packet)
+ Nanite::Log.debug("RECV #{packet.to_s}")
case packet
when Advertise
- Nanite::Log.debug("handling Advertise: #{packet.inspect}")
+ Nanite::Log.info("RECV #{packet.to_s}") unless Nanite::Log.level == Logger::DEBUG
advertise_services
when Request, Push
- Nanite::Log.debug("handling Request: #{packet.inspect}")
if @security && !@security.authorize(packet)
+ Nanite::Log.warn("RECV NOT AUTHORIZED #{packet.to_s}")
if packet.kind_of?(Request)
r = Result.new(packet.token, packet.reply_to, @deny_token, identity)
amq.queue(packet.reply_to, :no_declare => options[:secure]).publish(serializer.dump(r))
end
else
+ Nanite::Log.info("RECV #{packet.to_s([:from, :tags])}") unless Nanite::Log.level == Logger::DEBUG
dispatcher.dispatch(packet)
end
when Result
- Nanite::Log.debug("handling Result: #{packet.inspect}")
+ Nanite::Log.info("RECV #{packet.to_s([])}") unless Nanite::Log.level == Logger::DEBUG
@mapper_proxy.handle_result(packet)
when IntermediateMessage
- Nanite::Log.debug("handling Intermediate Result: #{packet.inspect}")
+ Nanite::Log.info("RECV #{packet.to_s([])}") unless Nanite::Log.level == Logger::DEBUG
@mapper_proxy.handle_intermediate_result(packet)
end
end
@@ -201,10 +205,9 @@ def setup_queue
amq.queue(identity, :durable => true).subscribe(:ack => true) do |info, msg|
begin
info.ack
- packet = serializer.load(msg)
- receive(packet)
+ receive(serializer.load(msg))
rescue Exception => e
- Nanite::Log.error("Error handling packet: #{e.message}")
+ Nanite::Log.error("RECV #{e.message}")
end
end
end
@@ -234,13 +237,15 @@ def setup_traps
def un_register
unless @unregistered
@unregistered = true
+ Nanite::Log.info("SEND [un_register]")
amq.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(UnRegister.new(identity)))
end
end
def advertise_services
- Nanite::Log.debug("advertise_services: #{registry.services.inspect}")
- amq.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(Register.new(identity, registry.services, status_proc.call, self.tags)))
+ reg = Register.new(identity, registry.services, status_proc.call, self.tags)
+ Nanite::Log.info("SEND #{reg.to_s}")
+ amq.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(reg))
end
def parse_uptime(up)
View
@@ -29,19 +29,19 @@ def register(reg)
case reg
when Register
if @security.authorize_registration(reg)
+ Nanite::Log.info("RECV #{reg.to_s}")
nanites[reg.identity] = { :services => reg.services, :status => reg.status, :tags => reg.tags }
reaper.timeout(reg.identity, agent_timeout + 1) { nanite_timed_out(reg.identity) }
callbacks[:register].call(reg.identity, mapper) if callbacks[:register]
- Nanite::Log.info("registered: #{reg.identity}, #{nanites[reg.identity].inspect}")
else
- Nanite::Log.warning("registration of #{reg.inspect} not authorized")
+ Nanite::Log.warn("RECV NOT AUTHORIZED #{reg.to_s}")
end
when UnRegister
+ Nanite::Log.info("RECV #{reg.to_s}")
nanites.delete(reg.identity)
callbacks[:unregister].call(reg.identity, mapper) if callbacks[:unregister]
- Nanite::Log.info("un-registering: #{reg.identity}")
else
- Nanite::Log.warning("Registration received an invalid packet type: #{reg.class}")
+ Nanite::Log.warn("RECV [register] Invalid packet type: #{reg.class}")
end
end
@@ -60,6 +60,7 @@ def publish(request, target)
begin
old_target = request.target
request.target = target unless target == 'mapper-offline'
+ Nanite::Log.info("SEND #{request.to_s([:from, :tags, :target])}")
amq.queue(target).publish(serializer.dump(request), :persistent => request.persistent)
ensure
request.target = old_target
@@ -76,14 +77,19 @@ def handle_ping(ping)
nanite[:status] = ping.status
reaper.reset_with_autoregister_hack(ping.identity, agent_timeout + 1) { nanite_timed_out(ping.identity) }
else
- amq.queue(ping.identity).publish(serializer.dump(Advertise.new))
+ packet = Advertise.new
+ Nanite::Log.info("SEND #{packet.to_s} to #{ping.identity}")
+ amq.queue(ping.identity).publish(serializer.dump(packet))
end
end
end
# forward request coming from agent
def handle_request(request)
if @security.authorize_request(request)
+ Nanite::Log.info("RECV #{request.to_s([:from, :target, :tags])}") unless Nanite::Log.level == Logger::DEBUG
+ Nanite::Log.debug("RECV #{request.to_s}")
+
intm_handler = lambda do |result, job|
result = IntermediateMessage.new(request.token, job.request.from, mapper.identity, nil, result)
forward_response(result, request.persistent)
@@ -99,12 +105,13 @@ def handle_request(request)
forward_response(result, request.persistent)
end
else
- Nanite::Log.warning("request #{request.inspect} not authorized")
+ Nanite::Log.warn("RECV NOT AUTHORIZED #{request.to_s}")
end
end
# forward response back to agent that originally made the request
def forward_response(res, persistent)
+ Nanite::Log.info("SEND #{res.to_s([:to])}")
amq.queue(res.to).publish(serializer.dump(res), :persistent => persistent)
end
@@ -157,10 +164,10 @@ def setup_heartbeat_queue
handler = lambda do |ping|
begin
ping = serializer.load(ping)
- Nanite::Log.debug("got heartbeat from #{ping.identity}") if ping.respond_to?(:identity)
+ Nanite::Log.debug("RECV #{ping.to_s}") if ping.respond_to?(:to_s)
handle_ping(ping)
rescue Exception => e
- Nanite::Log.error("Error handling heartbeat: #{e.message}")
+ Nanite::Log.error("RECV [ping] #{e.message}")
end
end
hb_fanout = amq.fanout('heartbeat', :durable => true)
@@ -174,11 +181,9 @@ def setup_heartbeat_queue
def setup_registration_queue
handler = lambda do |msg|
begin
- msg = serializer.load(msg)
- Nanite::Log.debug("got registration from #{msg.identity}")
- register(msg)
+ register(serializer.load(msg))
rescue Exception => e
- Nanite::Log.error("Error handling registration: #{e.message}")
+ Nanite::Log.error("RECV [register] #{e.message}")
end
end
reg_fanout = amq.fanout('registration', :durable => true)
@@ -192,11 +197,9 @@ def setup_registration_queue
def setup_request_queue
handler = lambda do |msg|
begin
- msg = serializer.load(msg)
- Nanite::Log.debug("got request from #{msg.from} of type #{msg.type}")
- handle_request(msg)
+ handle_request(serializer.load(msg))
rescue Exception => e
- Nanite::Log.error("Error handling request: #{e.message}")
+ Nanite::Log.error("RECV [request] #{e.message}")
end
end
req_fanout = amq.fanout('request', :durable => true)
@@ -212,7 +215,7 @@ def setup_state
when String
# backwards compatibility, we assume redis if the configuration option
# was a string
- Nanite::Log.info("using redis for state storage")
+ Nanite::Log.info("[setup] using redis for state storage")
require 'nanite/state'
@nanites = Nanite::State.new(@state)
when Hash
View
@@ -10,6 +10,7 @@ def initialize(amq, registry, serializer, identity, options)
@identity = identity
@options = options
@evmclass = EM
+ @evmclass.threadpool_size = @options[:threadpool_size].to_i || 20
end
def dispatch(deliverable)
@@ -31,12 +32,13 @@ def dispatch(deliverable)
callback = lambda do |r|
if deliverable.kind_of?(Request)
r = Result.new(deliverable.token, deliverable.reply_to, r, identity)
+ Nanite::Log.info("SEND #{r.to_s([])}")
amq.queue(deliverable.reply_to, :no_declare => options[:secure]).publish(serializer.dump(r))
end
r # For unit tests
end
- if @options[:single_threaded]
+ if @options[:single_threaded] || @options[:thread_poolsize] == 1
@evmclass.next_tick { callback.call(operation.call) }
else
@evmclass.defer(operation, callback)
View
@@ -14,8 +14,6 @@ def new_job(request, targets, inthandler = nil, blk = nil)
end
def process(msg)
- Nanite::Log.debug("processing message: #{msg.inspect}")
-
if job = jobs[msg.token]
job.process(msg)
View
@@ -30,7 +30,8 @@ def init(identity = nil, path = false)
def level=(loglevel)
init() unless @logger
loglevel = loglevel.intern if loglevel.is_a?(String)
- @logger.info("Setting log level to #{loglevel.to_s.upcase}")
+ @logger.info("[setup] setting log level to #{loglevel.to_s.upcase}")
+ @level = loglevel
case loglevel
when :debug
@logger.level = Logger::DEBUG
@@ -46,7 +47,7 @@ def level=(loglevel)
raise ArgumentError, "Log level must be one of :debug, :info, :warn, :error, or :fatal"
end
end
-
+
# Passes any other method calls on directly to the underlying Logger object created with init. If
# this method gets hit before a call to Nanite::Logger.init has been made, it will call
# Nanite::Logger.init() with no arguments.
View
@@ -74,6 +74,7 @@ class Mapper
# broker is restarted. Default is false. Can be overriden on a per-message basis using the request and push methods.
#
# secure : use Security features of rabbitmq to restrict nanites to themselves
+ #
# prefetch : Sets prefetch (only supported in RabbitMQ >= 1.6)
#
# Connection options:
@@ -129,7 +130,7 @@ def run
@amq = start_amqp(@options)
@job_warden = JobWarden.new(@serializer)
setup_cluster
- Nanite::Log.info('starting mapper')
+ Nanite::Log.info('[setup] starting mapper')
setup_queues
start_console if @options[:console] && !@options[:daemonize]
end
@@ -273,11 +274,12 @@ def setup_offline_queue
def setup_message_queue
amq.queue(identity, :exclusive => true).bind(amq.fanout(identity)).subscribe do |msg|
begin
- msg = serializer.load(msg)
- Nanite::Log.debug("got result from #{msg.from}: #{msg.results.inspect}")
+ msg = serializer.load(msg)
+ Nanite::Log.debug("RECV #{msg.to_s}")
+ Nanite::Log.info("RECV #{msg.to_s([:from])}") unless Nanite::Log.level == Logger::DEBUG
job_warden.process(msg)
rescue Exception => e
- Nanite::Log.error("Error handling result: #{e.message}")
+ Nanite::Log.error("RECV [result] #{e.message}")
end
end
end
@@ -37,6 +37,7 @@ def request(type, payload = '', opts = {}, &blk)
request.persistent = opts.key?(:persistent) ? opts[:persistent] : options[:persistent]
pending_requests[request.token] =
{ :intermediate_handler => opts[:intermediate_handler], :result_handler => blk }
+ Nanite::Log.info("SEND #{request.to_s([:tags, :target])}")
amqp.fanout('request', :no_declare => options[:secure]).publish(serializer.dump(request))
end
Oops, something went wrong.

0 comments on commit 4074f9c

Please sign in to comment.