Skip to content

Commit

Permalink
Merge commit 'ezmobius/master' into merge_ezmobius
Browse files Browse the repository at this point in the history
Conflicts:
	lib/nanite/cluster.rb
	lib/nanite/job.rb
	lib/nanite/mapper.rb
  • Loading branch information
roidrage committed Jun 26, 2009
2 parents ecd142f + 8cd5c3f commit 0719f40
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 125 deletions.
2 changes: 1 addition & 1 deletion Rakefile
Expand Up @@ -10,7 +10,7 @@ end
require 'rake/clean'

GEM = "nanite"
VER = "0.4.0"
VER = "0.4.1"
AUTHOR = "Ezra Zygmuntowicz"
EMAIL = "ezra@engineyard.com"
HOMEPAGE = "http://github.com/ezmobius/nanite"
Expand Down
52 changes: 26 additions & 26 deletions lib/nanite/agent.rb
Expand Up @@ -166,32 +166,27 @@ def load_actors
end

def receive(packet)
begin
packet = serializer.load(packet)
case packet
when Advertise
Nanite::Log.debug("handling Advertise: #{packet}")
advertise_services
when Request, Push
Nanite::Log.debug("handling Request: #{packet}")
if @security && !@security.authorize(packet)
if packet.kind_of?(Request)
r = Result.new(packet.token, packet.reply_to, @deny_token, identity)
amq.queue(packet.reply_to, :no_declare => options[:secure]).publish(serializer.dump(r))
end
else
dispatcher.dispatch(packet)
case packet
when Advertise
Nanite::Log.debug("handling Advertise: #{packet.inspect}")
advertise_services
when Request, Push
Nanite::Log.debug("handling Request: #{packet.inspect}")
if @security && !@security.authorize(packet)
if packet.kind_of?(Request)
r = Result.new(packet.token, packet.reply_to, @deny_token, identity)
amq.queue(packet.reply_to, :no_declare => options[:secure]).publish(serializer.dump(r))
end
when Result
Nanite::Log.debug("handling Result: #{packet}")
@mapper_proxy.handle_result(packet)
when IntermediateMessage
Nanite::Log.debug("handling Intermediate Result: #{packet}")
@mapper_proxy.handle_intermediate_result(packet)
else
dispatcher.dispatch(packet)
end
rescue Exception => e
Nanite::Log.error("Error handling packet #{packet.inspect}:\n#{e.message} at #{e.backtrace[0]}")
end
when Result
Nanite::Log.debug("handling Result: #{packet.inspect}")
@mapper_proxy.handle_result(packet)
when IntermediateMessage
Nanite::Log.debug("handling Intermediate Result: #{packet.inspect}")
@mapper_proxy.handle_intermediate_result(packet)
end
end

def tag(*tags)
Expand All @@ -201,8 +196,13 @@ def tag(*tags)

def setup_queue
amq.queue(identity, :durable => true).subscribe(:ack => true) do |info, msg|
info.ack
receive(msg)
begin
info.ack
packet = serializer.load(msg)
receive(packet)
rescue Exception => e
Nanite::Log.error("Error handling packet: #{e.message}")
end
end
end

Expand Down
100 changes: 53 additions & 47 deletions lib/nanite/cluster.rb
Expand Up @@ -78,35 +78,29 @@ def handle_ping(ping)
else
amq.queue(ping.identity).publish(serializer.dump(Advertise.new))
end
rescue Exception => e
Nanite::Log.error("Error handling ping #{ping.inspect}:\n#{e.message} at #{e.backtrace[0]}")
end
end

# forward request coming from agent
def handle_request(request)
begin
if @security.authorize_request(request)
intm_handler = lambda do |result, job|
result = IntermediateMessage.new(request.token, job.request.from, mapper.identity, nil, result)
forward_response(result, request.persistent)
end

result = Result.new(request.token, request.from, nil, mapper.identity)
ok = mapper.send_request(request, :intermediate_handler => intm_handler) do |res|
result.results = res
forward_response(result, request.persistent)
end

if ok == false
forward_response(result, request.persistent)
end
else
Nanite::Log.warning("request #{request.inspect} not authorized")
if @security.authorize_request(request)
intm_handler = lambda do |result, job|
result = IntermediateMessage.new(request.token, job.request.from, mapper.identity, nil, result)
forward_response(result, request.persistent)
end

result = Result.new(request.token, request.from, nil, mapper.identity)
ok = mapper.send_request(request, :intermediate_handler => intm_handler) do |res|
result.results = res
forward_response(result, request.persistent)
end

if ok == false
forward_response(result, request.persistent)
end
rescue Exception => e
Nanite::Log.error("Error handling request #{request.inspect}:\n#{e.message} at #{e.backtrace[0]}")
end
else
Nanite::Log.warning("request #{request.inspect} not authorized")
end
end

# forward response back to agent that originally made the request
Expand Down Expand Up @@ -160,44 +154,56 @@ def setup_queues
end

def setup_heartbeat_queue
if shared_state?
amq.queue("heartbeat").bind(amq.fanout('heartbeat', :durable => true)).subscribe do |ping|
Nanite::Log.debug('got heartbeat')
handle_ping(serializer.load(ping))
handler = lambda do |ping|
begin
ping = serializer.load(ping)
Nanite::Log.debug("got heartbeat from #{ping.identity}") if ping.respond_to?(:identity)
handle_ping(ping)
rescue Exception => e
Nanite::Log.error("Error handling heartbeat: #{e.message}")
end
end
hb_fanout = amq.fanout('heartbeat', :durable => true)
if shared_state?
amq.queue("heartbeat").bind(hb_fanout).subscribe &handler
else
amq.queue("heartbeat-#{identity}", :exclusive => true).bind(amq.fanout('heartbeat', :durable => true)).subscribe do |ping|
Nanite::Log.debug('got heartbeat')
handle_ping(serializer.load(ping))
end
amq.queue("heartbeat-#{identity}", :exclusive => true).bind(hb_fanout).subscribe &handler
end
end

def setup_registration_queue
if shared_state?
amq.queue("registration").bind(amq.fanout('registration', :durable => true)).subscribe do |msg|
Nanite::Log.debug('got registration')
register(serializer.load(msg))
handler = lambda do |msg|
begin
msg = serializer.load(msg)
Nanite::Log.debug("got registration from #{msg.identity}")
register(msg)
rescue Exception => e
Nanite::Log.error("Error handling registration: #{e.message}")
end
end
reg_fanout = amq.fanout('registration', :durable => true)
if shared_state?
amq.queue("registration").bind(reg_fanout).subscribe &handler
else
amq.queue("registration-#{identity}", :exclusive => true).bind(amq.fanout('registration', :durable => true)).subscribe do |msg|
Nanite::Log.debug('got registration')
register(serializer.load(msg))
end
amq.queue("registration-#{identity}", :exclusive => true).bind(reg_fanout).subscribe &handler
end
end

def setup_request_queue
if shared_state?
amq.queue("request").bind(amq.fanout('request', :durable => true)).subscribe do |msg|
Nanite::Log.debug('got request')
handle_request(serializer.load(msg))
handler = lambda do |msg|
begin
msg = serializer.load(msg)
Nanite::Log.debug("got request from #{msg.from} of type #{msg.type}")
handle_request(msg)
rescue Exception => e
Nanite::Log.error("Error handling request: #{e.message}")
end
end
req_fanout = amq.fanout('request', :durable => true)
if shared_state?
amq.queue("request").bind(req_fanout).subscribe &handler
else
amq.queue("request-#{identity}", :exclusive => true).bind(amq.fanout('request', :durable => true)).subscribe do |msg|
Nanite::Log.debug('got request')
handle_request(serializer.load(msg))
end
amq.queue("request-#{identity}", :exclusive => true).bind(req_fanout).subscribe &handler
end
end

Expand Down
61 changes: 28 additions & 33 deletions lib/nanite/job.rb
Expand Up @@ -14,50 +14,45 @@ def new_job(request, targets, inthandler = nil, blk = nil)
end

def process(msg)
begin
msg = serializer.load(msg)
Nanite::Log.debug("processing message: #{msg.inspect}")
Nanite::Log.debug("processing message: #{msg.inspect}")

if job = jobs[msg.token]
job.process(msg)
if job = jobs[msg.token]
job.process(msg)

if job.intermediate_handler && (job.pending_keys.size > 0)
if job.intermediate_handler && (job.pending_keys.size > 0)

unless job.pending_keys.size == 1
raise "IntermediateMessages are currently dispatched as they arrive, shouldn't have more than one key in pending_keys: #{job.pending_keys.inspect}"
end
unless job.pending_keys.size == 1
raise "IntermediateMessages are currently dispatched as they arrive, shouldn't have more than one key in pending_keys: #{job.pending_keys.inspect}"
end

key = job.pending_keys.first
handler = job.intermediate_handler_for_key(key)
if handler
case handler.arity
when 2
handler.call(job.intermediate_state[msg.from][key].last, job)
when 3
handler.call(key, msg.from, job.intermediate_state[msg.from][key].last)
when 4
handler.call(key, msg.from, job.intermediate_state[msg.from][key].last, job)
end
key = job.pending_keys.first
handler = job.intermediate_handler_for_key(key)
if handler
case handler.arity
when 2
handler.call(job.intermediate_state[msg.from][key].last, job)
when 3
handler.call(key, msg.from, job.intermediate_state[msg.from][key].last)
when 4
handler.call(key, msg.from, job.intermediate_state[msg.from][key].last, job)
end

job.reset_pending_intermediate_state_keys
end

if job.completed?
jobs.delete(job.token)
if job.completed
case job.completed.arity
when 1
job.completed.call(job.results)
when 2
job.completed.call(job.results, job)
end
job.reset_pending_intermediate_state_keys
end

if job.completed?
jobs.delete(job.token)
if job.completed
case job.completed.arity
when 1
job.completed.call(job.results)
when 2
job.completed.call(job.results, job)
end
end
end
end
rescue Exception => e
Nanite::Log.error("Error handling message #{msg.inspect}:\n#{e.message} at #{e.backtrace[0]}")
end
end # JobWarden

Expand Down
3 changes: 2 additions & 1 deletion lib/nanite/log.rb
Expand Up @@ -30,6 +30,7 @@ def init(identity = nil, path = false)
def level=(loglevel)
init() unless @logger
loglevel = loglevel.intern if loglevel.is_a?(String)
@logger.info("Setting log level to #{loglevel.to_s.upcase}")
case loglevel
when :debug
@logger.level = Logger::DEBUG
Expand Down Expand Up @@ -60,4 +61,4 @@ def method_missing(method_symbol, *args)

end # class << self
end
end
end
12 changes: 9 additions & 3 deletions lib/nanite/mapper.rb
Expand Up @@ -260,7 +260,13 @@ def setup_offline_queue

def setup_message_queue
amq.queue(identity, :exclusive => true).bind(amq.fanout(identity)).subscribe do |msg|
job_warden.process(msg)
begin
msg = serializer.load(msg)
Nanite::Log.debug("got result from #{msg.from}: #{msg.results.inspect}")
job_warden.process(msg)
rescue Exception => e
Nanite::Log.error("Error handling result: #{e.message}")
end
end
end

Expand All @@ -269,8 +275,8 @@ def setup_logging
if @options[:daemonize]
log_path = (@options[:log_dir] || @options[:root] || Dir.pwd)
end
Log.init(@identity, log_path)
Log.level = @options[:log_level] if @options[:log_level]
Nanite::Log.init(@identity, log_path)
Nanite::Log.level = @options[:log_level] if @options[:log_level]
end

def setup_cluster
Expand Down
1 change: 1 addition & 0 deletions lib/nanite/security/secure_serializer.rb
Expand Up @@ -59,6 +59,7 @@ def self.load(json)
JSON.load(jsn) if jsn
rescue Exception => e
Nanite::Log.error("Loading of secure packet failed: #{e.message}\n#{e.backtrace.join("\n")}")
raise
end
end

Expand Down
4 changes: 2 additions & 2 deletions spec/agent_spec.rb
Expand Up @@ -197,8 +197,8 @@
@amq = mock("AMQueue", :queue => mock("queue", :subscribe => {}, :publish => {}), :fanout => mock("fanout", :publish => nil))
MQ.stub!(:new).and_return(@amq)
serializer = Nanite::Serializer.new
@request = serializer.dump(Nanite::Request.new('/foo/bar', ''))
@push = serializer.dump(Nanite::Push.new('/foo/bar', ''))
@request = Nanite::Request.new('/foo/bar', '')
@push = Nanite::Push.new('/foo/bar', '')
@agent = Nanite::Agent.start
end

Expand Down
9 changes: 9 additions & 0 deletions spec/cluster_spec.rb
Expand Up @@ -337,16 +337,25 @@
end

it "should serialize request before publishing it" do
@request.should_receive(:target=).with(@target)
@request.should_receive(:target=)
@request.should_receive(:target)
@serializer.should_receive(:dump).with(@request).and_return("serialized_request")
@cluster.publish(@request, @target)
end

it "should publish request to target queue" do
@request.should_receive(:target=).with(@target)
@request.should_receive(:target=)
@request.should_receive(:target)
@queue.should_receive(:publish).with("dumped_value", anything())
@cluster.publish(@request, @target)
end

it "should persist request based on request setting" do
@request.should_receive(:target=).with(@target)
@request.should_receive(:target=)
@request.should_receive(:target)
@request.should_receive(:persistent).and_return(false)
@queue.should_receive(:publish).with(anything(), { :persistent => false })
@cluster.publish(@request, @target)
Expand Down

0 comments on commit 0719f40

Please sign in to comment.