Skip to content

Loading…

Unlock jobs after worker crash - with ruby 1.8.7 compatibility for DJ V3 #429

Closed
wants to merge 3 commits into from

4 participants

@jeffdeville

I needed Ivan Vanderbyl's patch, but was on V3, and ruby 1.8.7, so I've shameless stolen his fantastic work and have it working in DJ V3. There've been a number of folks who have run into the problem, so figured I'd try and raise awareness a bit

Here was Ivan's original pull request

@pboling

I am having lots of trouble with crashed workers... I wonder if this will help me... Will check.

@albus522
Collective Idea member

Direct lookups against the backend are no longer performed in the delayed_job gem. This would have to be refactored to work against the various DJ backends. https://github.com/collectiveidea/delayed_job/wiki/backends

@albus522 albus522 closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Showing with 33 additions and 1 deletion.
  1. +1 −1 delayed_job.gemspec
  2. +32 −0 lib/delayed/worker.rb
View
2 delayed_job.gemspec
@@ -1,4 +1,4 @@
-# -*- encoding: utf-8 -*-
+# encoding: utf-8
Gem::Specification.new do |s|
s.name = 'delayed_job'
View
32 lib/delayed/worker.rb
@@ -127,6 +127,8 @@ def start
say "Starting job worker"
+ recover_crashed_jobs!
+
self.class.lifecycle.run_callbacks(:execute, self) do
loop do
self.class.lifecycle.run_callbacks(:loop, self) do
@@ -228,6 +230,36 @@ def max_attempts(job)
protected
+ # Finds locked jobs in database, then checks if the process IDs exist, if not, triggers recovery action.
+ def recover_crashed_jobs!
+ Delayed::Job.find_each do |job|
+ if job.locked_by
+ match = /host:([^\s]+) pid:(\d+)/.match(job.locked_by)
+ host_name, pid = match[1], match[2].to_i
+ if !is_process_running?(pid) && Socket.gethostname == host_name
+ say job.last_error = "* [JOB] Worker process crashed #{job.locked_by}, recovering job..."
+ job.save
+ if job.payload_object.respond_to? :recover
+ say "* [JOB] Running recover hook"
+ job.payload_object.recover
+ end
+ reschedule(job)
+ end
+ end
+ end
+ rescue => e
+ say "* [ERROR] #{e.message}\n" + e.backtrace.join("\n")
+ end
+
+ def is_process_running?(pid)
+ begin
+ Process.getpgid( pid )
+ true
+ rescue Errno::ESRCH
+ false
+ end
+ end
+
def handle_failed_job(job, error)
job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
Something went wrong with that request. Please try again.