Permalink
Browse files

Merge branch 'exit_hooks'

Conflicts:
	lib/nanite.rb
  • Loading branch information...
2 parents a125887 + 3b86f38 commit e503ff20e6c5e6908e255efbf11737a694091479 @roidrage roidrage committed Oct 12, 2010
View
@@ -23,6 +23,7 @@
require 'nanite/streaming'
require 'nanite/nanite_dispatcher'
require 'nanite/agent'
+require 'nanite/agent/monitor'
require 'nanite/cluster'
require 'nanite/reaper'
require 'nanite/log'
View
@@ -13,7 +13,28 @@ module Nanite
#
# end
module Actor
-
+ def self.running_jobs?
+ @running_jobs and @running_jobs.any?
+ end
+
+ def self.add_running_job(message)
+ @running_jobs ||= Set.new
+ @running_jobs << message
+ end
+
+ def self.remove_running_job(message, retries = 0)
+ @running_jobs ||= Set.new
+ if @running_jobs.include?(message)
+ @running_jobs.delete(message)
+ elsif retries < 3
+ EM.next_tick {remove_running_job(message, retries + 1)}
+ end
+ end
+
+ def self.running_jobs
+ @running_jobs
+ end
+
def self.included(base)
base.class_eval do
include Nanite::Actor::InstanceMethods
@@ -65,6 +86,13 @@ def request(*args, &blk)
def push(*args)
MapperProxy.instance.push(*args)
end
+
+ def done(message)
+ EM.next_tick do
+ Nanite::Log.debug("Marking job as done")
+ Nanite::Actor.remove_running_job(message)
+ end
+ end
end # InstanceMethods
end # Actor
View
@@ -3,9 +3,8 @@ class Agent
include AMQPHelper
include FileStreaming
include ConsoleHelper
- include DaemonizeHelper
-
- attr_reader :identity, :options, :serializer, :dispatcher, :registry, :amq, :tags
+
+ attr_reader :identity, :options, :serializer, :dispatcher, :registry, :amqp, :tags, :heartbeat
attr_accessor :status_proc
DEFAULT_OPTIONS = COMMON_DEFAULT_OPTIONS.merge({
@@ -82,7 +81,12 @@ class Agent
# and YAML file specify option, Ruby code options take precedence.
def self.start(options = {})
agent = new(options)
- agent.run
+ begin
+ agent.run
+ rescue
+ agent.cleanup
+ raise
+ end
agent
end
@@ -99,23 +103,15 @@ def run
Log.level = @options[:log_level] if @options[:log_level]
@serializer = Serializer.new(@options[:format])
@status_proc = lambda { parse_uptime(`uptime 2> /dev/null`) rescue 'no status' }
- pid_file = PidFile.new(@identity, @options)
- pid_file.check
- if @options[:daemonize]
- daemonize(@identity, @options)
- pid_file.write
- at_exit { pid_file.remove }
- end
- @amq = start_amqp(@options)
+ @monitor = Nanite::Agent::Monitor.new(self, @options)
+ @amqp = start_amqp(@options)
@registry = ActorRegistry.new
- @dispatcher = Dispatcher.new(@amq, @registry, @serializer, @identity, @options)
+ @dispatcher = Dispatcher.new(@amqp, @registry, @serializer, @identity, @options)
setup_mapper_proxy
load_actors
- setup_traps
setup_queue
advertise_services
setup_heartbeat
- at_exit { un_register } unless $TESTING
start_console if @options[:console] && !@options[:daemonize]
end
@@ -134,6 +130,26 @@ def register_security(security, deny_token = "Denied")
@deny_token = deny_token
end
+ def unsubscribe
+ heartbeat.cancel
+ amqp.queue('heartbeat').unsubscribe
+ amqp.queue(identity).unsubscribe
+ end
+
+ def disconnect
+ amqp.close_connection
+ @mapper_proxy.amqp.close_connection
+ end
+
+ def un_register
+ Nanite::Log.info("SEND [un_register]")
+ amqp.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(UnRegister.new(identity)))
+ end
+
+ def cleanup
+ @monitor.cleanup if @monitor
+ end
+
protected
def set_configuration(opts)
@@ -189,7 +205,7 @@ def receive(packet)
Nanite::Log.warn("RECV NOT AUTHORIZED #{packet.to_s}")
if packet.kind_of?(Request)
r = Result.new(packet.token, packet.reply_to, @deny_token, identity)
- amq.queue(packet.reply_to, :no_declare => options[:secure]).publish(serializer.dump(r))
+ amqp.queue(packet.reply_to, :no_declare => options[:secure]).publish(serializer.dump(r))
end
else
dispatcher.dispatch(packet)
@@ -207,10 +223,10 @@ def tag(*tags)
end
def setup_queue
- if amq.respond_to?(:prefetch) && @options.has_key?(:prefetch)
- amq.prefetch(@options[:prefetch])
+ if amqp.respond_to?(:prefetch) && @options.has_key?(:prefetch)
+ amqp.prefetch(@options[:prefetch])
end
- amq.queue(identity, :durable => true).subscribe(:ack => true) do |info, msg|
+ amqp.queue(identity, :durable => true).subscribe(:ack => true) do |info, msg|
begin
info.ack
receive(serializer.load(msg))
@@ -221,39 +237,19 @@ def setup_queue
end
def setup_heartbeat
- EM.add_periodic_timer(options[:ping_time]) do
- amq.fanout('heartbeat', :no_declare => options[:secure]).publish(serializer.dump(Ping.new(identity, status_proc.call)))
+ @heartbeat = EM.add_periodic_timer(options[:ping_time]) do
+ amqp.fanout('heartbeat', :no_declare => options[:secure]).publish(serializer.dump(Ping.new(identity, status_proc.call)))
end
end
def setup_mapper_proxy
@mapper_proxy = MapperProxy.new(identity, options)
end
- def setup_traps
- ['INT', 'TERM'].each do |sig|
- old = trap(sig) do
- un_register
- amq.instance_variable_get('@connection').close do
- EM.stop
- old.call if old.is_a? Proc
- end
- end
- end
- end
-
- def un_register
- unless @unregistered
- @unregistered = true
- Nanite::Log.info("SEND [un_register]")
- amq.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(UnRegister.new(identity)))
- end
- end
-
def advertise_services
reg = Register.new(identity, registry.services, status_proc.call, self.tags)
Nanite::Log.info("SEND #{reg.to_s}")
- amq.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(reg))
+ amqp.fanout('registration', :no_declare => options[:secure]).publish(serializer.dump(reg))
end
def parse_uptime(up)
@@ -0,0 +1,86 @@
+module Nanite
+ class Agent
+ class Monitor
+ include DaemonizeHelper
+
+ attr_reader :agent, :options, :shutting_down, :pid_file
+
+ def initialize(agent, options = {})
+ @agent = agent
+ @options = options
+ setup_pid_file
+ daemonize_agent if options[:daemonize]
+ setup_traps
+ end
+
+ def setup_pid_file
+ @pid_file = PidFile.new(agent.identity, options)
+ @pid_file.check
+ end
+
+ def daemonize_agent
+ daemonize(agent.identity, options)
+ pid_file.write
+ end
+
+ def setup_traps
+ ['INT', 'TERM'].each do |signal|
+ trap signal do
+ graceful_shutdown
+ end
+ end unless $TESTING
+
+ trap 'USR1' do
+ Nanite::Log.info("#{(Nanite::Actor.running_jobs || []).size} running jobs")
+ Nanite::Log.info("Job list:\n#{(Nanite::Actor.running_jobs || []).collect{|job| "#{job.type}: #{job.payload[0..50]}"}}")
+ end
+ end
+
+ def graceful_shutdown
+ exit if shutting_down
+ @shutting_down = true
+ begin
+ initiate_shutdown
+ rescue
+ Nanite::Log.error("Error during graceful shutdown: #{$!.message}\n#{$!.backtrace.join("\n")}")
+ exit
+ end
+ end
+
+ def cleanup
+ pid_file.remove if options[:daemonize]
+ end
+
+ def initiate_shutdown
+ cleanup
+ agent.unsubscribe
+ agent.un_register
+ wait_for_running_actors do
+ shutdown
+ end
+ end
+
+ def shutdown
+ agent.disconnect
+ EM.add_timer(0.5) do
+ EM.stop
+ exit
+ end
+ end
+
+ def wait_for_running_actors(&blk)
+ if options[:graceful] and Nanite::Actor.running_jobs?
+ Nanite::Log.info("Waiting for running jobs to finish")
+ timer = EM.add_periodic_timer(1) do
+ unless Nanite::Actor.running_jobs?
+ timer.cancel
+ blk.call
+ end
+ end
+ else
+ blk.call
+ end
+ end
+ end
+ end
+end
View
@@ -3,7 +3,7 @@ class Queue
# Monkey patch to add :no_declare => true for new queue objects. See the
# explanation for MQ::Exchange#initialize below.
- def initialize mq, name, opts = {}
+ def initialize(mq, name, opts = {})
@mq = mq
@opts = opts
@bindings ||= {}
@@ -21,21 +21,25 @@ def initialize mq, name, opts = {}
# If this flag is true, the server will attempt to requeue the message, potentially then
# delivering it to an alternative subscriber.
#
- def recover requeue = false
+ def recover(requeue = false)
@mq.callback{
@mq.send Protocol::Basic::Recover.new({ :requeue => requeue })
}
self
end
end
+
+ def close_connection
+ @connection.close
+ end
end
# monkey patch to the amqp gem that adds :no_declare => true option for new
# Exchange objects. This allows us to send messeages to exchanges that are
# declared by the mappers and that we have no configuration priviledges on.
# temporary until we get this into amqp proper
MQ::Exchange.class_eval do
- def initialize mq, type, name, opts = {}
+ def initialize(mq, type, name, opts = {})
@mq = mq
@type, @name, @opts = type, name, opts
@mq.exchanges[@name = name] ||= self
@@ -19,6 +19,7 @@ def dispatch(deliverable)
actor = registry.actor_for(prefix)
operation = lambda do
+ increment_running_jobs(deliverable)
begin
intermediate_results_proc = lambda { |*args| self.handle_intermediate_results(actor, meth, deliverable, *args) }
args = [ deliverable.payload ]
@@ -47,6 +48,13 @@ def dispatch(deliverable)
protected
+ def increment_running_jobs(job)
+ EM.next_tick do
+ Nanite::Actor.add_running_job(job)
+ Nanite::Log.debug("Adding running job")
+ end if options[:graceful]
+ end
+
def handle_intermediate_results(actor, meth, deliverable, *args)
messagekey = case args.size
when 1
View
@@ -85,7 +85,7 @@ def delete(nanite)
@redis.srem(tag, nanite)
if @redis.scard(tag) == 0
@redis.del(tag)
- @redis.sdelete("nanitetags", tag)
+ @redis.srem("nanitetags", tag)
end
end
@redis.del nanite
View
@@ -1,6 +1,6 @@
spec = Gem::Specification.new do |s|
s.name = 'nanite'
- s.version = '0.4.1.12'
+ s.version = '0.4.1.15.7'
s.platform = Gem::Platform::RUBY
s.has_rdoc = true
s.extra_rdoc_files = ['README.rdoc', 'LICENSE', 'TODO']
Oops, something went wrong.

0 comments on commit e503ff2

Please sign in to comment.