Permalink
Browse files

Merge branch 'keepalive_pull_request'

  • Loading branch information...
2 parents af5a9a8 + dce2dc1 commit cdbf191bf193270a8121906e1251a9189704f93f @hone committed Dec 1, 2011
Showing with 122 additions and 11 deletions.
  1. +2 −0 lib/resque/tasks.rb
  2. +67 −11 lib/resque/worker.rb
  3. +53 −0 test/worker_test.rb
View
@@ -14,6 +14,8 @@
worker = Resque::Worker.new(*queues)
worker.verbose = ENV['LOGGING'] || ENV['VERBOSE']
worker.very_verbose = ENV['VVERBOSE']
+ worker.keepalive_interval = (ENV['RESQUE_KEEPALIVE_INTERVAL'] || 25)
+ worker.keepalive_expire = (ENV['RESQUE_KEEPALIVE_EXPIRE'] || 60).to_i
rescue Resque::NoQueueError
abort "set QUEUE env var, e.g. $ QUEUE=critical,high rake resque:work"
end
View
@@ -20,8 +20,21 @@ class Worker
# Automatically set if a fork(2) fails.
attr_accessor :cant_fork
+ # How often the keepalive thread runs. Unit is in seconds.
+ attr_accessor :keepalive_interval
+
+ # How long before the worker is considered dead if the keepalive isn't run. Unit is in seconds.
+ # This must be an integer or redis will ERR
+ attr_accessor :keepalive_expire
+
attr_writer :to_s
+ # How long before we prune_dead_workers. Unit is in seconds.
+ # Must ensure we use an integer for redis.expire.
+ def last_prune_expire
+ @last_prune_expire ||= [1, (keepalive_interval - 5).to_i].max
+ end
+
# Returns an array of all worker objects.
def self.all
Array(redis.smembers(:workers)).map { |id| find(id) }.compact
@@ -88,7 +101,8 @@ def self.exists?(worker_id)
# in alphabetical order. Queues can be dynamically added or
# removed without needing to restart workers using this method.
def initialize(*queues)
- @queues = queues.map { |queue| queue.to_s.strip }
+ @queues = queues.map { |queue| queue.to_s.strip }
+ @keepalive_thread = nil
validate_queues
end
@@ -102,6 +116,44 @@ def validate_queues
end
end
+ # This creates the keepalive thread that lets redis know the
+ # worker is still alive. It also prunes workers.
+ def setup_keepalive_thread
+ @keepalive_thread = Thread.new {
+ # stagger keepalive thread to avoid all workers checking
+ # at the same time
+ sleep(keepalive_interval * Kernel.rand)
+ loop do
+ redis.setex(self, keepalive_expire, self)
+ log! "Heartbeat for #{self} | ttl: #{redis.ttl(self)}"
+
+ if set_last_prune
+ log! "Pruning dead workers"
+ Worker.prune_dead_workers
+ end
+ sleep keepalive_interval
+ end
+ }
+ end
+
+ # Try to be the only worker to set last_prune. Returns true if we were able
+ # to, false otherwise.
+ #
+ # We don't execute the transaction at all if the key exists ahead of time.
+ # If it doesnt exist, we set it only if it still does not exist and expire
+ # it in a transaction, which ensures the expiration is always set. Success
+ # is determined by whether we were responsible for setting the key.
+ # Multiple workers setting the expiration at roughly the same time is not a
+ # concern.
+ def set_last_prune
+ return false if redis.get(:last_prune)
+ transaction_results = redis.multi do
+ redis.setnx(:last_prune, Time.now.to_i)
+ redis.expire(:last_prune, last_prune_expire)
+ end
+ transaction_results.first == 1
+ end
+
# This is the main workhorse method. Called on a Worker instance,
# it begins the worker life cycle.
#
@@ -237,9 +289,10 @@ def fork
def startup
enable_gc_optimizations
register_signal_handlers
- prune_dead_workers
+ Worker.prune_dead_workers
run_hook :before_first_fork
register_worker
+ setup_keepalive_thread
# Fix buffering so we can `rake resque:work > resque.log` and
# get output from the child in there.
@@ -338,21 +391,22 @@ def unpause_processing
#
# By checking the current Redis state against the actual
# environment, we can determine if Redis is old and clean it up a bit.
- def prune_dead_workers
- all_workers = Worker.all
- known_workers = worker_pids unless all_workers.empty?
- all_workers.each do |worker|
- host, pid, queues = worker.id.split(':')
- next unless host == hostname
- next if known_workers.include?(pid)
- log! "Pruning dead worker: #{worker}"
- worker.unregister_worker
+ def self.prune_dead_workers
+ all_workers = Array(redis.smembers(:workers))
+ return if all_workers.empty?
+
+ dead_workers = all_workers - redis.mget(*all_workers)
+ dead_workers.each do |worker_id|
+ worker = Worker.find(worker_id)
+ worker.unregister_worker if worker
end
end
# Registers ourself as a worker. Useful when entering the worker
# lifecycle on startup.
def register_worker
+ redis.set(self, self)
+ redis.expire(self, keepalive_expire)
redis.sadd(:workers, self)
started!
end
@@ -385,6 +439,8 @@ def unregister_worker
Stat.clear("processed:#{self}")
Stat.clear("failed:#{self}")
+
+ @keepalive_thread.exit if @keepalive_thread
end
# Given a job, tells Redis we're working on it. Useful for seeing
View
@@ -309,12 +309,65 @@ def self.exception
assert_equal 2, Resque.workers.size
+ # simulate dead workers
+ Resque.redis.del(workerA, workerB)
+
# then we prune them
@worker.work(0) do
assert_equal 1, Resque.workers.size
end
end
+ test "worker registers keepalive key on startup" do
+ worker = Resque::Worker.new(:jobs)
+ worker.instance_variable_set(:@to_s, "#{`hostname`.chomp}:1:jobs")
+ worker.keepalive_interval = 25
+ worker.keepalive_expire = 60 # must be an integer
+ worker.startup
+
+ assert_not_equal -1, Resque.redis.ttl(worker.to_s)
+ assert Resque.redis.get(worker.to_s)
+ worker.unregister_worker
+ end
+
+ test "worker has a heartbeat" do
+ worker = Resque::Worker.new(:jobs)
+ worker.instance_variable_set(:@to_s, "#{`hostname`.chomp}:1:jobs")
+ worker.keepalive_interval = 0.01
+ worker.keepalive_expire = 1 # must be an integer
+ worker.startup
+ Resque.redis.del(worker.to_s)
+ sleep(0.05)
+
+ assert_not_equal -1, Resque.redis.ttl(worker.to_s)
+ assert Resque.redis.get(worker.to_s)
+ worker.unregister_worker
+ end
+
+ test "worker cleans up dead worker info after startup" do
+ workers = [Resque::Worker.new(:jobs), Resque::Worker.new(:jobs)]
+ workers.each_with_index do |worker, index|
+ worker.instance_variable_set(:@to_s, "#{`hostname`.chomp}:#{index}:jobs")
+ worker.keepalive_interval = 0.01
+ worker.keepalive_expire = 1 # must be an integer
+ worker.startup
+ end
+
+ # simulate dead worker
+ Resque.redis.del(workers.first)
+ workers.first.instance_variable_get(:@keepalive_thread).exit
+
+ sleep(0.05)
+ # make sure set_last_prune is run
+ assert_not_equal -1, Resque.redis.ttl(:last_prune)
+ # simulate last_prune expire
+ Resque.redis.del(:last_prune)
+ sleep(0.05)
+
+ assert_not_equal -1, Resque.redis.ttl(:last_prune)
+ assert_equal 1, Resque.workers.size
+ end
+
test "worker_pids returns pids" do
known_workers = @worker.worker_pids
assert !known_workers.empty?

0 comments on commit cdbf191

Please sign in to comment.