Skip to content

Commit

Permalink
Implemented pre_term_timeout with test
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethan Langevin committed Apr 16, 2014
1 parent 7cc6869 commit 296edf2
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 12 deletions.
1 change: 1 addition & 0 deletions lib/resque/tasks.rb
Expand Up @@ -19,6 +19,7 @@
worker.very_verbose = ENV['VVERBOSE']
end
worker.term_timeout = ENV['RESQUE_TERM_TIMEOUT'] || 4.0
worker.pre_term_timeout = ENV['RESQUE_PRE_TERM_TIMEOUT'] || 0.0
worker.term_child = ENV['TERM_CHILD']
worker.run_at_exit_hooks = ENV['RUN_AT_EXIT_HOOKS']
rescue Resque::NoQueueError
Expand Down
39 changes: 27 additions & 12 deletions lib/resque/worker.rb
Expand Up @@ -51,6 +51,8 @@ def decode(object)

attr_accessor :term_timeout

attr_accessor :pre_term_timeout

# decide whether to use new_kill_child logic
attr_accessor :term_child

Expand Down Expand Up @@ -390,11 +392,11 @@ def register_signal_handlers

def unregister_signal_handlers
trap('TERM') do
trap ('TERM') do
# ignore subsequent terms
end
raise TermException.new("SIGTERM")
end
trap ('TERM') do
# ignore subsequent terms
end
raise TermException.new("SIGTERM")
end
trap('INT', 'DEFAULT')

begin
Expand Down Expand Up @@ -455,13 +457,14 @@ def kill_child
# wait 5 seconds, and then a KILL signal if it has not quit
def new_kill_child
if @child
unless Process.waitpid(@child, Process::WNOHANG)
unless child_already_exited?
if pre_term_timeout.to_f > 0.0
log! "Waiting #{pre_term_timeout.to_f}s for child process to exit"
return if wait_for_child_exit(pre_term_timeout)
end
log! "Sending TERM signal to child #{@child}"
Process.kill("TERM", @child)
(term_timeout.to_f * 10).round.times do |i|
sleep(0.1)
return if Process.waitpid(@child, Process::WNOHANG)
end
return if wait_for_child_exit(term_timeout)
log! "Sending KILL signal to child #{@child}"
Process.kill("KILL", @child)
else
Expand All @@ -472,6 +475,18 @@ def new_kill_child
log! "Child #{@child} already quit and reaped."
end

def child_already_exited?
Process.waitpid(@child, Process::WNOHANG)
end

def wait_for_child_exit(timeout)
(timeout.to_f * 10).round.times do |i|
sleep(0.1)
return true if child_already_exited?
end
false
end

# are we paused?
def paused?
@paused
Expand Down Expand Up @@ -508,9 +523,9 @@ def prune_dead_workers
worker_queues = worker_queues_raw.split(",")
unless @queues.include?("*") || (worker_queues.to_set == @queues.to_set)
# If the worker we are trying to prune does not belong to the queues
# we are listening to, we should not touch it.
# we are listening to, we should not touch it.
# Attempt to prune a worker from different queues may easily result in
# an unknown class exception, since that worker could easily be even
# an unknown class exception, since that worker could easily be even
# written in different language.
next
end
Expand Down
74 changes: 74 additions & 0 deletions test/worker_test.rb
Expand Up @@ -1010,6 +1010,80 @@ def self.perform(run_time)
end
end

if !defined?(RUBY_ENGINE) || defined?(RUBY_ENGINE) && RUBY_ENGINE != "jruby"
{
'job finishes in allotted time' => 0.5,
'job takes too long' => 1.1
}.each do |scenario, run_time|
test "gives time to finish before sending term if pre_term_timeout is set: when #{scenario}" do
begin
class LongRunningJob
@queue = :long_running_job

def self.perform(run_time)
Resque.redis.client.reconnect # get its own connection
Resque.redis.rpush('pre-term-timeout-test:start', Process.pid)
sleep run_time
Resque.redis.rpush('pre-term-timeout-test:result', 'Finished Normally')
rescue Resque::TermException => e
Resque.redis.rpush('pre-term-timeout-test:result', %Q(Caught TermException: #{e.inspect}))
ensure
Resque.redis.rpush('pre-term-timeout-test:final', 'exiting.')
end
end

pre_term_timeout = 1
Resque.enqueue(LongRunningJob, run_time)

worker_pid = Kernel.fork do
# reconnect to redis
Resque.redis.client.reconnect

# ensure we fork (in worker)
$TESTING = false

worker = Resque::Worker.new(:long_running_job)
worker.pre_term_timeout = pre_term_timeout
worker.term_timeout = 2
worker.term_child = 1

worker.work(0)
exit!
end

# ensure the worker is started
start_status = Resque.redis.blpop('pre-term-timeout-test:start', 5)
assert_not_nil start_status
child_pid = start_status[1].to_i
assert_operator child_pid, :>, 0

# send signal to abort the worker
Process.kill('TERM', worker_pid)
Process.waitpid(worker_pid)

# wait to see how it all came down
result = Resque.redis.blpop('pre-term-timeout-test:result', 5)
assert_not_nil result

if run_time >= pre_term_timeout
assert !result[1].start_with?('Finished Normally'), 'Job finished normally when running over pre term timeout'
assert result[1].start_with?('Caught TermException'), 'TermException not raised in child.'
else
assert result[1].start_with?('Finished Normally'), 'Job did not finish normally. Pre term timeout too short?'
assert !result[1].start_with?('Caught TermException'), 'TermException raised in child.'
end

# ensure that the child pid is no longer running
child_still_running = !(`ps -p #{child_pid.to_s} -o pid=`).empty?
assert !child_still_running
ensure
remaining_keys = Resque.redis.keys('pre-term-timeout-test:*') || []
Resque.redis.del(*remaining_keys) unless remaining_keys.empty?
end
end
end
end

test "displays warning when not using term_child" do
begin
$TESTING = false
Expand Down

0 comments on commit 296edf2

Please sign in to comment.